) REGISTRY.get(value.getClass());
+ encoder.encode(writer, value, ENCODER_CONTEXT);
}
private static MessageSettings getPayloadMessageSettings(final SplittablePayload.Type type, final MessageSettings settings) {
MessageSettings payloadMessageSettings = settings;
if (type != SplittablePayload.Type.INSERT) {
payloadMessageSettings = createMessageSettingsBuilder(settings)
- .maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM)
+ .maxDocumentSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE)
.build();
}
return payloadMessageSettings;
@@ -102,7 +198,7 @@ private static MessageSettings getPayloadMessageSettings(final SplittablePayload
private static MessageSettings getDocumentMessageSettings(final MessageSettings settings) {
return createMessageSettingsBuilder(settings)
- .maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM)
+ .maxMessageSize(settings.getMaxDocumentSize() + DOCUMENT_HEADROOM_SIZE)
.build();
}
@@ -126,8 +222,50 @@ private static boolean exceedsLimits(final MessageSettings settings, final int m
return false;
}
+ /**
+ * A {@link BsonWriter} that allows appending key/value pairs to a document that has been fully written to a {@link BsonOutput}.
+ */
+ private static final class AppendingBsonWriter extends LevelCountingBsonWriter implements AutoCloseable {
+ private static final int INITIAL_LEVEL = DEFAULT_INITIAL_LEVEL + 1;
- private BsonWriterHelper() {
+ /**
+ * @param bsonOutputWithDocument A {@link BsonOutput} {@linkplain BsonOutput#getPosition() positioned}
+ * immediately after the end of the document.
+ * @param documentStartPosition The {@linkplain BsonOutput#getPosition() position} of the start of the document
+ * in {@code bsonOutputWithDocument}.
+ */
+ AppendingBsonWriter(final BsonOutput bsonOutputWithDocument, final int documentStartPosition) {
+ super(
+ new InternalAppendingBsonBinaryWriter(bsonOutputWithDocument, documentStartPosition),
+ INITIAL_LEVEL);
+ }
+
+ @Override
+ public void writeEndDocument() {
+ assertTrue(getCurrentLevel() > INITIAL_LEVEL);
+ super.writeEndDocument();
+ }
+
+ @Override
+ public void close() {
+ try (InternalAppendingBsonBinaryWriter writer = (InternalAppendingBsonBinaryWriter) getBsonWriter()) {
+ writer.writeEndDocument();
+ }
+ }
+
+ private static final class InternalAppendingBsonBinaryWriter extends BsonBinaryWriter {
+ InternalAppendingBsonBinaryWriter(final BsonOutput bsonOutputWithDocument, final int documentStartPosition) {
+ super(bsonOutputWithDocument);
+ int documentEndPosition = bsonOutputWithDocument.getPosition();
+ int bsonDocumentEndingSize = 1;
+ int appendFromPosition = documentEndPosition - bsonDocumentEndingSize;
+ bsonOutputWithDocument.truncateToPosition(appendFromPosition);
+ setState(State.NAME);
+ setContext(new Context(null, BsonContextType.DOCUMENT, documentStartPosition));
+ }
+ }
}
+ private BsonWriterHelper() {
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java b/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java
index 5cd2000d879..40df1b867fd 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
+import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
/**
@@ -52,6 +53,19 @@ public ByteBufferBsonOutput(final BufferProvider bufferProvider) {
this.bufferProvider = notNull("bufferProvider", bufferProvider);
}
+ /**
+ * Creates a new empty {@link ByteBufferBsonOutput.Branch},
+ * which gets merged into this {@link ByteBufferBsonOutput} on {@link ByteBufferBsonOutput.Branch#close()}
+ * by appending its data without copying it.
+ * If multiple branches are created, they are merged in the order they are {@linkplain ByteBufferBsonOutput.Branch#close() closed}.
+ * {@linkplain #close() Closing} this {@link ByteBufferBsonOutput} does not {@linkplain ByteBufferBsonOutput.Branch#close() close} the branch.
+ *
+ * @return A new {@link ByteBufferBsonOutput.Branch}.
+ */
+ public ByteBufferBsonOutput.Branch branch() {
+ return new ByteBufferBsonOutput.Branch(this);
+ }
+
@Override
public void writeBytes(final byte[] bytes, final int offset, final int length) {
ensureOpen();
@@ -156,7 +170,9 @@ public int pipe(final OutputStream out) throws IOException {
@Override
public void truncateToPosition(final int newPosition) {
ensureOpen();
-
+ if (newPosition == position) {
+ return;
+ }
if (newPosition > position || newPosition < 0) {
throw new IllegalArgumentException();
}
@@ -174,36 +190,89 @@ public void truncateToPosition(final int newPosition) {
position = newPosition;
}
+ /**
+ * The {@link #flush()} method of {@link ByteBufferBsonOutput} and of its subclasses does nothing.
+ */
+ @Override
+ public final void flush() throws IOException {
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Idempotent.
+ */
@Override
public void close() {
- for (final ByteBuf cur : bufferList) {
- cur.release();
+ if (isOpen()) {
+ for (final ByteBuf cur : bufferList) {
+ cur.release();
+ }
+ bufferList.clear();
+ closed = true;
}
- bufferList.clear();
- closed = true;
}
private BufferPositionPair getBufferPositionPair(final int absolutePosition) {
int positionInBuffer = absolutePosition;
int bufferIndex = 0;
- int bufferSize = INITIAL_BUFFER_SIZE;
+ int bufferSize = bufferList.get(bufferIndex).position();
int startPositionOfBuffer = 0;
while (startPositionOfBuffer + bufferSize <= absolutePosition) {
bufferIndex++;
startPositionOfBuffer += bufferSize;
positionInBuffer -= bufferSize;
- bufferSize = bufferList.get(bufferIndex).limit();
+ bufferSize = bufferList.get(bufferIndex).position();
}
return new BufferPositionPair(bufferIndex, positionInBuffer);
}
private void ensureOpen() {
- if (closed) {
+ if (!isOpen()) {
throw new IllegalStateException("The output is closed");
}
}
+ boolean isOpen() {
+ return !closed;
+ }
+
+ /**
+ * @see #branch()
+ */
+ private void merge(final ByteBufferBsonOutput branch) {
+ assertTrue(branch instanceof ByteBufferBsonOutput.Branch);
+ branch.bufferList.forEach(ByteBuf::retain);
+ bufferList.addAll(branch.bufferList);
+ curBufferIndex += branch.curBufferIndex + 1;
+ position += branch.position;
+ }
+
+ public static final class Branch extends ByteBufferBsonOutput {
+ private final ByteBufferBsonOutput parent;
+
+ private Branch(final ByteBufferBsonOutput parent) {
+ super(parent.bufferProvider);
+ this.parent = parent;
+ }
+
+ /**
+ * @see #branch()
+ */
+ @Override
+ public void close() {
+ if (isOpen()) {
+ try {
+ assertTrue(parent.isOpen());
+ parent.merge(this);
+ } finally {
+ super.close();
+ }
+ }
+ }
+ }
+
private static final class BufferPositionPair {
private final int bufferIndex;
private int position;
diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java
index bac2a86e61d..46eabab21bb 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java
@@ -23,6 +23,7 @@
import com.mongodb.ServerApi;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.internal.TimeoutContext;
+import com.mongodb.internal.connection.MessageSequences.EmptyMessageSequences;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
@@ -34,7 +35,6 @@
import org.bson.BsonString;
import org.bson.ByteBuf;
import org.bson.FieldNameValidator;
-import org.bson.io.BsonOutput;
import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
@@ -45,12 +45,17 @@
import static com.mongodb.ReadPreference.primary;
import static com.mongodb.ReadPreference.primaryPreferred;
import static com.mongodb.assertions.Assertions.assertFalse;
+import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.fail;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ClusterConnectionMode.LOAD_BALANCED;
import static com.mongodb.connection.ClusterConnectionMode.SINGLE;
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
import static com.mongodb.connection.ServerType.STANDALONE;
+import static com.mongodb.internal.connection.BsonWriterHelper.appendElementsToDocument;
+import static com.mongodb.internal.connection.BsonWriterHelper.backpatchLength;
+import static com.mongodb.internal.connection.BsonWriterHelper.writeDocumentsOfDualMessageSequences;
import static com.mongodb.internal.connection.BsonWriterHelper.writePayload;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createList;
import static com.mongodb.internal.connection.ByteBufBsonDocument.createOne;
@@ -64,43 +69,57 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public final class CommandMessage extends RequestMessage {
+ /**
+ * Specifies that the `OP_MSG` section payload is a BSON document.
+ */
+ private static final byte PAYLOAD_TYPE_0_DOCUMENT = 0;
+ /**
+ * Specifies that the `OP_MSG` section payload is a sequence of BSON documents.
+ */
+ private static final byte PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE = 1;
+
private final MongoNamespace namespace;
private final BsonDocument command;
private final FieldNameValidator commandFieldNameValidator;
private final ReadPreference readPreference;
private final boolean exhaustAllowed;
- private final SplittablePayload payload;
- private final FieldNameValidator payloadFieldNameValidator;
+ private final MessageSequences sequences;
private final boolean responseExpected;
+ /**
+ * {@code null} iff either {@link #sequences} is not of the {@link DualMessageSequences} type,
+ * or it is of that type, but it has not been {@linkplain #encodeMessageBodyWithMetadata(ByteBufferBsonOutput, OperationContext) encoded}.
+ */
+ @Nullable
+ private Boolean dualMessageSequencesRequireResponse;
private final ClusterConnectionMode clusterConnectionMode;
private final ServerApi serverApi;
CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
final ReadPreference readPreference, final MessageSettings settings, final ClusterConnectionMode clusterConnectionMode,
@Nullable final ServerApi serverApi) {
- this(namespace, command, commandFieldNameValidator, readPreference, settings, true, null, null,
+ this(namespace, command, commandFieldNameValidator, readPreference, settings, true, EmptyMessageSequences.INSTANCE,
clusterConnectionMode, serverApi);
}
CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
final ReadPreference readPreference, final MessageSettings settings, final boolean exhaustAllowed,
final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi) {
- this(namespace, command, commandFieldNameValidator, readPreference, settings, true, exhaustAllowed, null, null,
+ this(namespace, command, commandFieldNameValidator, readPreference, settings, true, exhaustAllowed, EmptyMessageSequences.INSTANCE,
clusterConnectionMode, serverApi);
}
CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
final ReadPreference readPreference, final MessageSettings settings, final boolean responseExpected,
- @Nullable final SplittablePayload payload, @Nullable final FieldNameValidator payloadFieldNameValidator,
+ final MessageSequences sequences,
final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi) {
- this(namespace, command, commandFieldNameValidator, readPreference, settings, responseExpected, false, payload,
- payloadFieldNameValidator, clusterConnectionMode, serverApi);
+ this(namespace, command, commandFieldNameValidator, readPreference, settings, responseExpected, false,
+ sequences, clusterConnectionMode, serverApi);
}
CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
final ReadPreference readPreference, final MessageSettings settings,
final boolean responseExpected, final boolean exhaustAllowed,
- @Nullable final SplittablePayload payload, @Nullable final FieldNameValidator payloadFieldNameValidator,
+ final MessageSequences sequences,
final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi) {
super(namespace.getFullName(), getOpCode(settings, clusterConnectionMode, serverApi), settings);
this.namespace = namespace;
@@ -108,9 +127,9 @@ public final class CommandMessage extends RequestMessage {
this.commandFieldNameValidator = commandFieldNameValidator;
this.readPreference = readPreference;
this.responseExpected = responseExpected;
+ this.dualMessageSequencesRequireResponse = null;
this.exhaustAllowed = exhaustAllowed;
- this.payload = payload;
- this.payloadFieldNameValidator = payloadFieldNameValidator;
+ this.sequences = sequences;
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
this.serverApi = serverApi;
assertTrue(useOpMsg() || responseExpected);
@@ -119,8 +138,8 @@ public final class CommandMessage extends RequestMessage {
/**
* Create a BsonDocument representing the logical document encoded by an OP_MSG.
*
- * The returned document will contain all the fields from the Body (Kind 0) Section, as well as all fields represented by
- * OP_MSG Document Sequence (Kind 1) Sections.
+ * The returned document will contain all the fields from the `PAYLOAD_TYPE_0_DOCUMENT` section, as well as all fields represented by
+ * `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` sections.
*/
BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
List byteBuffers = bsonOutput.getByteBuffers();
@@ -130,14 +149,14 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
byteBuf.position(getEncodingMetadata().getFirstDocumentPosition());
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);
- // If true, it means there is at least one Kind 1:Document Sequence in the OP_MSG
+ // If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
if (byteBuf.hasRemaining()) {
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();
// Each loop iteration processes one Document Sequence
// When there are no more bytes remaining, there are no more Document Sequences
while (byteBuf.hasRemaining()) {
- // skip reading the payload type, we know it is 1
+ // skip reading the payload type, we know it is `PAYLOAD_TYPE_1`
byteBuf.position(byteBuf.position() + 1);
int sequenceStart = byteBuf.position();
int sequenceSizeInBytes = byteBuf.getInt();
@@ -170,7 +189,7 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
/**
* Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type
- * Document Sequence (Kind 1).
+ * `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE`.
*
* Upon normal completion of the method, the buffer will be positioned at the start of the first BSON object in the sequence.
*/
@@ -192,7 +211,15 @@ boolean isResponseExpected() {
if (responseExpected) {
return true;
} else {
- return payload != null && payload.isOrdered() && payload.hasAnotherSplit();
+ if (sequences instanceof SplittablePayload) {
+ SplittablePayload payload = (SplittablePayload) sequences;
+ return payload.isOrdered() && payload.hasAnotherSplit();
+ } else if (sequences instanceof DualMessageSequences) {
+ return assertNotNull(dualMessageSequencesRequireResponse);
+ } else if (!(sequences instanceof EmptyMessageSequences)) {
+ fail(sequences.toString());
+ }
+ return false;
}
}
@@ -201,47 +228,74 @@ MongoNamespace getNamespace() {
}
@Override
- protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOutput, final OperationContext operationContext) {
+ protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
+ int commandStartPosition = useOpMsg() ? writeOpMsg(bsonOutput, operationContext) : writeOpQuery(bsonOutput);
+ return new EncodingMetadata(commandStartPosition);
+ }
+
+ @SuppressWarnings("try")
+ private int writeOpMsg(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
int messageStartPosition = bsonOutput.getPosition() - MESSAGE_PROLOGUE_LENGTH;
- int commandStartPosition;
- if (useOpMsg()) {
- int flagPosition = bsonOutput.getPosition();
- bsonOutput.writeInt32(0); // flag bits
- bsonOutput.writeByte(0); // payload type
- commandStartPosition = bsonOutput.getPosition();
-
- addDocument(command, bsonOutput, commandFieldNameValidator, getExtraElements(operationContext));
-
- if (payload != null) {
- bsonOutput.writeByte(1); // payload type
- int payloadBsonOutputStartPosition = bsonOutput.getPosition();
- bsonOutput.writeInt32(0); // size
- bsonOutput.writeCString(payload.getPayloadName());
- writePayload(new BsonBinaryWriter(bsonOutput, payloadFieldNameValidator), bsonOutput, getSettings(),
- messageStartPosition, payload, getSettings().getMaxDocumentSize());
-
- int payloadBsonOutputLength = bsonOutput.getPosition() - payloadBsonOutputStartPosition;
- bsonOutput.writeInt32(payloadBsonOutputStartPosition, payloadBsonOutputLength);
+ int flagPosition = bsonOutput.getPosition();
+ bsonOutput.writeInt32(0); // flag bits
+ bsonOutput.writeByte(PAYLOAD_TYPE_0_DOCUMENT);
+ int commandStartPosition = bsonOutput.getPosition();
+ List extraElements = getExtraElements(operationContext);
+
+ int commandDocumentSizeInBytes = writeDocument(command, bsonOutput, commandFieldNameValidator);
+ if (sequences instanceof SplittablePayload) {
+ appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
+ SplittablePayload payload = (SplittablePayload) sequences;
+ try (FinishOpMsgSectionWithPayloadType1 finishSection = startOpMsgSectionWithPayloadType1(
+ bsonOutput, payload.getPayloadName())) {
+ writePayload(
+ new BsonBinaryWriter(bsonOutput, payload.getFieldNameValidator()),
+ bsonOutput, getSettings(), messageStartPosition, payload, getSettings().getMaxDocumentSize());
}
-
- // Write the flag bits
- bsonOutput.writeInt32(flagPosition, getOpMsgFlagBits());
+ } else if (sequences instanceof DualMessageSequences) {
+ DualMessageSequences dualMessageSequences = (DualMessageSequences) sequences;
+ try (ByteBufferBsonOutput.Branch bsonOutputBranch2 = bsonOutput.branch();
+ ByteBufferBsonOutput.Branch bsonOutputBranch1 = bsonOutput.branch()) {
+ DualMessageSequences.EncodeDocumentsResult encodeDocumentsResult;
+ try (FinishOpMsgSectionWithPayloadType1 finishSection1 = startOpMsgSectionWithPayloadType1(
+ bsonOutputBranch1, dualMessageSequences.getFirstSequenceId());
+ FinishOpMsgSectionWithPayloadType1 finishSection2 = startOpMsgSectionWithPayloadType1(
+ bsonOutputBranch2, dualMessageSequences.getSecondSequenceId())) {
+ encodeDocumentsResult = writeDocumentsOfDualMessageSequences(
+ dualMessageSequences, commandDocumentSizeInBytes, bsonOutputBranch1,
+ bsonOutputBranch2, getSettings());
+ }
+ dualMessageSequencesRequireResponse = encodeDocumentsResult.isServerResponseRequired();
+ extraElements.addAll(encodeDocumentsResult.getExtraElements());
+ appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
+ }
+ } else if (sequences instanceof EmptyMessageSequences) {
+ appendElementsToDocument(bsonOutput, commandStartPosition, extraElements);
} else {
- bsonOutput.writeInt32(0);
- bsonOutput.writeCString(namespace.getFullName());
- bsonOutput.writeInt32(0);
- bsonOutput.writeInt32(-1);
+ fail(sequences.toString());
+ }
+
+ // Write the flag bits
+ bsonOutput.writeInt32(flagPosition, getOpMsgFlagBits());
+ return commandStartPosition;
+ }
- commandStartPosition = bsonOutput.getPosition();
+ private int writeOpQuery(final ByteBufferBsonOutput bsonOutput) {
+ bsonOutput.writeInt32(0);
+ bsonOutput.writeCString(namespace.getFullName());
+ bsonOutput.writeInt32(0);
+ bsonOutput.writeInt32(-1);
- List elements = null;
- if (serverApi != null) {
- elements = new ArrayList<>(3);
- addServerApiElements(elements);
- }
- addDocument(command, bsonOutput, commandFieldNameValidator, elements);
+ int commandStartPosition = bsonOutput.getPosition();
+
+ List elements = null;
+ if (serverApi != null) {
+ elements = new ArrayList<>(3);
+ addServerApiElements(elements);
}
- return new EncodingMetadata(commandStartPosition);
+ writeDocument(command, bsonOutput, commandFieldNameValidator);
+ appendElementsToDocument(bsonOutput, commandStartPosition, elements);
+ return commandStartPosition;
}
private int getOpMsgFlagBits() {
@@ -269,7 +323,7 @@ private List getExtraElements(final OperationContext operationConte
SessionContext sessionContext = operationContext.getSessionContext();
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
- List extraElements = new ArrayList<>();
+ ArrayList extraElements = new ArrayList<>();
if (!getSettings().isCryptd()) {
timeoutContext.runMaxTimeMS(maxTimeMS ->
extraElements.add(new BsonElement("maxTimeMS", new BsonInt64(maxTimeMS)))
@@ -341,6 +395,19 @@ private void addReadConcernDocument(final List extraElements, final
}
}
+ /**
+ * @param sequenceId The identifier of the sequence contained in the {@code OP_MSG} section to be written.
+ * @see OP_MSG
+ */
+ private FinishOpMsgSectionWithPayloadType1 startOpMsgSectionWithPayloadType1(final ByteBufferBsonOutput bsonOutput, final String sequenceId) {
+ bsonOutput.writeByte(PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE);
+ int sequenceStart = bsonOutput.getPosition();
+ // size to be patched back later
+ bsonOutput.writeInt32(0);
+ bsonOutput.writeCString(sequenceId);
+ return () -> backpatchLength(sequenceStart, bsonOutput);
+ }
+
private static OpCode getOpCode(final MessageSettings settings, final ClusterConnectionMode clusterConnectionMode,
@Nullable final ServerApi serverApi) {
return isServerVersionKnown(settings) || clusterConnectionMode == LOAD_BALANCED || serverApi != null
@@ -351,4 +418,9 @@ private static OpCode getOpCode(final MessageSettings settings, final ClusterCon
private static boolean isServerVersionKnown(final MessageSettings settings) {
return settings.getMaxWireVersion() >= FOUR_DOT_ZERO_WIRE_VERSION;
}
+
+ @FunctionalInterface
+ private interface FinishOpMsgSectionWithPayloadType1 extends AutoCloseable {
+ void close();
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandProtocolImpl.java b/driver-core/src/main/com/mongodb/internal/connection/CommandProtocolImpl.java
index de9e0666d40..eb4d6d49516 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/CommandProtocolImpl.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/CommandProtocolImpl.java
@@ -26,17 +26,15 @@
import org.bson.FieldNameValidator;
import org.bson.codecs.Decoder;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.connection.ProtocolHelper.getMessageSettings;
class CommandProtocolImpl implements CommandProtocol {
private final MongoNamespace namespace;
private final BsonDocument command;
- private final SplittablePayload payload;
+ private final MessageSequences sequences;
private final ReadPreference readPreference;
private final FieldNameValidator commandFieldNameValidator;
- private final FieldNameValidator payloadFieldNameValidator;
private final Decoder commandResultDecoder;
private final boolean responseExpected;
private final ClusterConnectionMode clusterConnectionMode;
@@ -44,8 +42,7 @@ class CommandProtocolImpl implements CommandProtocol {
CommandProtocolImpl(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
@Nullable final ReadPreference readPreference, final Decoder commandResultDecoder, final boolean responseExpected,
- @Nullable final SplittablePayload payload, @Nullable final FieldNameValidator payloadFieldNameValidator,
- final ClusterConnectionMode clusterConnectionMode, final OperationContext operationContext) {
+ final MessageSequences sequences, final ClusterConnectionMode clusterConnectionMode, final OperationContext operationContext) {
notNull("database", database);
this.namespace = new MongoNamespace(notNull("database", database), MongoNamespace.COMMAND_COLLECTION_NAME);
this.command = notNull("command", command);
@@ -53,13 +50,9 @@ class CommandProtocolImpl implements CommandProtocol {
this.readPreference = readPreference;
this.commandResultDecoder = notNull("commandResultDecoder", commandResultDecoder);
this.responseExpected = responseExpected;
- this.payload = payload;
- this.payloadFieldNameValidator = payloadFieldNameValidator;
+ this.sequences = sequences;
this.clusterConnectionMode = notNull("clusterConnectionMode", clusterConnectionMode);
this.operationContext = operationContext;
-
- isTrueArgument("payloadFieldNameValidator cannot be null if there is a payload.",
- payload == null || payloadFieldNameValidator != null);
}
@Nullable
@@ -87,13 +80,13 @@ public void executeAsync(final InternalConnection connection, final SingleResult
@Override
public CommandProtocolImpl withSessionContext(final SessionContext sessionContext) {
return new CommandProtocolImpl<>(namespace.getDatabaseName(), command, commandFieldNameValidator, readPreference,
- commandResultDecoder, responseExpected, payload, payloadFieldNameValidator, clusterConnectionMode,
+ commandResultDecoder, responseExpected, sequences, clusterConnectionMode,
operationContext.withSessionContext(sessionContext));
}
private CommandMessage getCommandMessage(final InternalConnection connection) {
return new CommandMessage(namespace, command, commandFieldNameValidator, readPreference,
- getMessageSettings(connection.getDescription(), connection.getInitialServerDescription()), responseExpected, payload,
- payloadFieldNameValidator, clusterConnectionMode, operationContext.getServerApi());
+ getMessageSettings(connection.getDescription(), connection.getInitialServerDescription()), responseExpected,
+ sequences, clusterConnectionMode, operationContext.getServerApi());
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/CompressedMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CompressedMessage.java
index 9880ef3fb0b..6764135daa1 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/CompressedMessage.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/CompressedMessage.java
@@ -17,7 +17,6 @@
package com.mongodb.internal.connection;
import org.bson.ByteBuf;
-import org.bson.io.BsonOutput;
import java.util.List;
@@ -37,7 +36,7 @@ class CompressedMessage extends RequestMessage {
}
@Override
- protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOutput, final OperationContext operationContext) {
+ protected EncodingMetadata encodeMessageBodyWithMetadata(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
bsonOutput.writeInt32(wrappedOpcode.getValue());
bsonOutput.writeInt32(getWrappedMessageSize(wrappedMessageBuffers) - MESSAGE_HEADER_LENGTH);
bsonOutput.writeByte(compressor.getId());
diff --git a/driver-core/src/main/com/mongodb/internal/connection/Connection.java b/driver-core/src/main/com/mongodb/internal/connection/Connection.java
index 95094b240c1..219fb9ae6b9 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/Connection.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/Connection.java
@@ -51,7 +51,7 @@ T command(String database, BsonDocument command, FieldNameValidator fieldNam
@Nullable
T command(String database, BsonDocument command, FieldNameValidator commandFieldNameValidator,
@Nullable ReadPreference readPreference, Decoder commandResultDecoder, OperationContext operationContext,
- boolean responseExpected, @Nullable SplittablePayload payload, @Nullable FieldNameValidator payloadFieldNameValidator);
+ boolean responseExpected, MessageSequences sequences);
enum PinningMode {
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java
index 8f3d0f09fd9..008cdbefcb7 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java
@@ -302,9 +302,9 @@ public T command(final String database, final BsonDocument command, final Fi
public T command(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
@Nullable final ReadPreference readPreference, final Decoder commandResultDecoder,
final OperationContext operationContext, final boolean responseExpected,
- @Nullable final SplittablePayload payload, @Nullable final FieldNameValidator payloadFieldNameValidator) {
+ final MessageSequences sequences) {
return wrapped.command(database, command, commandFieldNameValidator, readPreference, commandResultDecoder, operationContext,
- responseExpected, payload, payloadFieldNameValidator);
+ responseExpected, sequences);
}
@Override
@@ -364,10 +364,10 @@ public void commandAsync(final String database, final BsonDocument command,
@Override
public void commandAsync(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
@Nullable final ReadPreference readPreference, final Decoder commandResultDecoder,
- final OperationContext operationContext, final boolean responseExpected, @Nullable final SplittablePayload payload,
- @Nullable final FieldNameValidator payloadFieldNameValidator, final SingleResultCallback callback) {
+ final OperationContext operationContext, final boolean responseExpected, final MessageSequences sequences,
+ final SingleResultCallback callback) {
wrapped.commandAsync(database, command, commandFieldNameValidator, readPreference, commandResultDecoder,
- operationContext, responseExpected, payload, payloadFieldNameValidator, callback);
+ operationContext, responseExpected, sequences, callback);
}
@Override
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java
index 01d5f587fdc..143ef5b76ae 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java
@@ -20,6 +20,7 @@
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.connection.MessageSequences.EmptyMessageSequences;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.session.SessionContext;
@@ -70,18 +71,17 @@ public ConnectionDescription getDescription() {
@Override
public T command(final String database, final BsonDocument command, final FieldNameValidator fieldNameValidator,
@Nullable final ReadPreference readPreference, final Decoder commandResultDecoder, final OperationContext operationContext) {
- return command(database, command, fieldNameValidator, readPreference, commandResultDecoder, operationContext, true, null, null);
+ return command(database, command, fieldNameValidator, readPreference, commandResultDecoder, operationContext, true, EmptyMessageSequences.INSTANCE);
}
@Nullable
@Override
public T command(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
@Nullable final ReadPreference readPreference, final Decoder commandResultDecoder,
- final OperationContext operationContext, final boolean responseExpected,
- @Nullable final SplittablePayload payload, @Nullable final FieldNameValidator payloadFieldNameValidator) {
+ final OperationContext operationContext, final boolean responseExpected, final MessageSequences sequences) {
return executeProtocol(
new CommandProtocolImpl<>(database, command, commandFieldNameValidator, readPreference, commandResultDecoder,
- responseExpected, payload, payloadFieldNameValidator, clusterConnectionMode, operationContext),
+ responseExpected, sequences, clusterConnectionMode, operationContext),
operationContext.getSessionContext());
}
@@ -90,16 +90,15 @@ public void commandAsync(final String database, final BsonDocument command,
@Nullable final ReadPreference readPreference, final Decoder commandResultDecoder, final OperationContext operationContext,
final SingleResultCallback callback) {
commandAsync(database, command, fieldNameValidator, readPreference, commandResultDecoder,
- operationContext, true, null, null, callback);
+ operationContext, true, EmptyMessageSequences.INSTANCE, callback);
}
@Override
public void commandAsync(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
@Nullable final ReadPreference readPreference, final Decoder commandResultDecoder, final OperationContext operationContext,
- final boolean responseExpected, @Nullable final SplittablePayload payload,
- @Nullable final FieldNameValidator payloadFieldNameValidator, final SingleResultCallback callback) {
+ final boolean responseExpected, final MessageSequences sequences, final SingleResultCallback callback) {
executeProtocolAsync(new CommandProtocolImpl<>(database, command, commandFieldNameValidator, readPreference,
- commandResultDecoder, responseExpected, payload, payloadFieldNameValidator, clusterConnectionMode, operationContext),
+ commandResultDecoder, responseExpected, sequences, clusterConnectionMode, operationContext),
operationContext.getSessionContext(), callback);
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/DualMessageSequences.java b/driver-core/src/main/com/mongodb/internal/connection/DualMessageSequences.java
new file mode 100644
index 00000000000..0c5a3430c22
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/connection/DualMessageSequences.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.connection;
+
+import org.bson.BsonBinaryWriter;
+import org.bson.BsonElement;
+import org.bson.FieldNameValidator;
+
+import java.util.List;
+
+/**
+ * Two sequences that may either be coupled or independent.
+ *
+ * This class is not part of the public API and may be removed or changed at any time.
+ */
+public abstract class DualMessageSequences extends MessageSequences {
+
+ private final String firstSequenceId;
+ private final FieldNameValidator firstFieldNameValidator;
+ private final String secondSequenceId;
+ private final FieldNameValidator secondFieldNameValidator;
+
+ protected DualMessageSequences(
+ final String firstSequenceId,
+ final FieldNameValidator firstFieldNameValidator,
+ final String secondSequenceId,
+ final FieldNameValidator secondFieldNameValidator) {
+ this.firstSequenceId = firstSequenceId;
+ this.firstFieldNameValidator = firstFieldNameValidator;
+ this.secondSequenceId = secondSequenceId;
+ this.secondFieldNameValidator = secondFieldNameValidator;
+ }
+
+ FieldNameValidator getFirstFieldNameValidator() {
+ return firstFieldNameValidator;
+ }
+
+ FieldNameValidator getSecondFieldNameValidator() {
+ return secondFieldNameValidator;
+ }
+
+ String getFirstSequenceId() {
+ return firstSequenceId;
+ }
+
+ String getSecondSequenceId() {
+ return secondSequenceId;
+ }
+
+ protected abstract EncodeDocumentsResult encodeDocuments(WritersProviderAndLimitsChecker writersProviderAndLimitsChecker);
+
+ /**
+ * @see #tryWrite(WriteAction)
+ */
+ public interface WritersProviderAndLimitsChecker {
+ /**
+ * Provides writers to the specified {@link WriteAction},
+ * {@linkplain WriteAction#doAndGetBatchCount(BsonBinaryWriter, BsonBinaryWriter) executes} it,
+ * checks the {@linkplain MessageSettings limits}.
+ *
+ * May be called multiple times per {@link #encodeDocuments(WritersProviderAndLimitsChecker)}.
+ */
+ WriteResult tryWrite(WriteAction write);
+
+ /**
+ * @see #doAndGetBatchCount(BsonBinaryWriter, BsonBinaryWriter)
+ */
+ interface WriteAction {
+ /**
+ * Writes documents to the sequences using the provided writers.
+ *
+ * @return The resulting batch count since the beginning of {@link #encodeDocuments(WritersProviderAndLimitsChecker)}.
+ * It is generally allowed to be greater than {@link MessageSettings#getMaxBatchCount()}.
+ */
+ int doAndGetBatchCount(BsonBinaryWriter firstWriter, BsonBinaryWriter secondWriter);
+ }
+
+ enum WriteResult {
+ FAIL_LIMIT_EXCEEDED,
+ OK_LIMIT_REACHED,
+ OK_LIMIT_NOT_REACHED
+ }
+ }
+
+ public static final class EncodeDocumentsResult {
+ private final boolean serverResponseRequired;
+ private final List extraElements;
+
+ /**
+ * @param extraElements See {@link #getExtraElements()}.
+ */
+ public EncodeDocumentsResult(final boolean serverResponseRequired, final List extraElements) {
+ this.serverResponseRequired = serverResponseRequired;
+ this.extraElements = extraElements;
+ }
+
+ boolean isServerResponseRequired() {
+ return serverResponseRequired;
+ }
+
+ /**
+ * {@linkplain BsonElement Key/value pairs} to be added to the document contained in the {@code OP_MSG} section with payload type 0.
+ */
+ List getExtraElements() {
+ return extraElements;
+ }
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/ElementExtendingBsonWriter.java b/driver-core/src/main/com/mongodb/internal/connection/ElementExtendingBsonWriter.java
deleted file mode 100644
index d0ed5234d50..00000000000
--- a/driver-core/src/main/com/mongodb/internal/connection/ElementExtendingBsonWriter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2008-present MongoDB, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.mongodb.internal.connection;
-
-import org.bson.BsonBinaryWriter;
-import org.bson.BsonElement;
-import org.bson.BsonReader;
-
-import java.util.List;
-
-import static com.mongodb.internal.connection.BsonWriterHelper.writeElements;
-
-/**
- * This class is not part of the public API and may be removed or changed at any time
- */
-public class ElementExtendingBsonWriter extends LevelCountingBsonWriter {
- private final BsonBinaryWriter writer;
- private final List extraElements;
-
-
- public ElementExtendingBsonWriter(final BsonBinaryWriter writer, final List extraElements) {
- super(writer);
- this.writer = writer;
- this.extraElements = extraElements;
- }
-
- @Override
- public void writeEndDocument() {
- if (getCurrentLevel() == 0) {
- writeElements(writer, extraElements);
- }
- super.writeEndDocument();
- }
-
- @Override
- public void pipe(final BsonReader reader) {
- if (getCurrentLevel() == -1) {
- writer.pipe(reader, extraElements);
- } else {
- writer.pipe(reader);
- }
- }
-}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/IdHoldingBsonWriter.java b/driver-core/src/main/com/mongodb/internal/connection/IdHoldingBsonWriter.java
index 4120dbdfb17..c73a5d4fd86 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/IdHoldingBsonWriter.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/IdHoldingBsonWriter.java
@@ -92,11 +92,11 @@ public void writeStartDocument() {
@Override
public void writeEndDocument() {
if (isWritingId()) {
- if (getIdBsonWriterCurrentLevel() >= 0) {
+ if (getIdBsonWriterCurrentLevel() > DEFAULT_INITIAL_LEVEL) {
getIdBsonWriter().writeEndDocument();
}
- if (getIdBsonWriterCurrentLevel() == -1) {
+ if (getIdBsonWriterCurrentLevel() == DEFAULT_INITIAL_LEVEL) {
if (id != null && id.isJavaScriptWithScope()) {
id = new BsonJavaScriptWithScope(id.asJavaScriptWithScope().getCode(), new RawBsonDocument(getBytes()));
} else if (id == null) {
@@ -105,7 +105,7 @@ public void writeEndDocument() {
}
}
- if (getCurrentLevel() == 0 && id == null) {
+ if (getCurrentLevel() == DEFAULT_INITIAL_LEVEL + 1 && id == null) {
id = fallbackId == null ? new BsonObjectId() : fallbackId;
writeObjectId(ID_FIELD_NAME, id.asObjectId().getValue());
}
@@ -115,7 +115,7 @@ public void writeEndDocument() {
@Override
public void writeStartArray() {
if (isWritingId()) {
- if (getIdBsonWriterCurrentLevel() == -1) {
+ if (getIdBsonWriterCurrentLevel() == DEFAULT_INITIAL_LEVEL) {
idFieldIsAnArray = true;
getIdBsonWriter().writeStartDocument();
getIdBsonWriter().writeName(ID_FIELD_NAME);
@@ -129,7 +129,7 @@ public void writeStartArray() {
public void writeStartArray(final String name) {
setCurrentFieldName(name);
if (isWritingId()) {
- if (getIdBsonWriterCurrentLevel() == -1) {
+ if (getIdBsonWriterCurrentLevel() == DEFAULT_INITIAL_LEVEL) {
getIdBsonWriter().writeStartDocument();
}
getIdBsonWriter().writeStartArray(name);
@@ -141,7 +141,7 @@ public void writeStartArray(final String name) {
public void writeEndArray() {
if (isWritingId()) {
getIdBsonWriter().writeEndArray();
- if (getIdBsonWriterCurrentLevel() == 0 && idFieldIsAnArray) {
+ if (getIdBsonWriterCurrentLevel() == DEFAULT_INITIAL_LEVEL + 1 && idFieldIsAnArray) {
getIdBsonWriter().writeEndDocument();
id = new RawBsonDocument(getBytes()).get(ID_FIELD_NAME);
}
@@ -308,7 +308,7 @@ public void writeMinKey() {
@Override
public void writeName(final String name) {
setCurrentFieldName(name);
- if (getIdBsonWriterCurrentLevel() >= 0) {
+ if (getIdBsonWriterCurrentLevel() > DEFAULT_INITIAL_LEVEL) {
getIdBsonWriter().writeName(name);
}
super.writeName(name);
@@ -433,13 +433,13 @@ private void setCurrentFieldName(final String name) {
}
private boolean isWritingId() {
- return getIdBsonWriterCurrentLevel() >= 0 || (getCurrentLevel() == 0 && currentFieldName != null
+ return getIdBsonWriterCurrentLevel() > DEFAULT_INITIAL_LEVEL || (getCurrentLevel() == DEFAULT_INITIAL_LEVEL + 1 && currentFieldName != null
&& currentFieldName.equals(ID_FIELD_NAME));
}
private void addBsonValue(final Supplier value, final Runnable writeValue) {
if (isWritingId()) {
- if (getIdBsonWriterCurrentLevel() >= 0) {
+ if (getIdBsonWriterCurrentLevel() > DEFAULT_INITIAL_LEVEL) {
writeValue.run();
} else {
id = value.get();
@@ -448,7 +448,7 @@ private void addBsonValue(final Supplier value, final Runnable writeV
}
private int getIdBsonWriterCurrentLevel() {
- return idBsonBinaryWriter == null ? -1 : idBsonBinaryWriter.getCurrentLevel();
+ return idBsonBinaryWriter == null ? DEFAULT_INITIAL_LEVEL : idBsonBinaryWriter.getCurrentLevel();
}
private LevelCountingBsonWriter getIdBsonWriter() {
diff --git a/driver-core/src/main/com/mongodb/internal/connection/LevelCountingBsonWriter.java b/driver-core/src/main/com/mongodb/internal/connection/LevelCountingBsonWriter.java
index 44889765fbf..3e9d0324bd7 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/LevelCountingBsonWriter.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/LevelCountingBsonWriter.java
@@ -20,10 +20,21 @@
abstract class LevelCountingBsonWriter extends BsonWriterDecorator {
- private int level = -1;
+ static final int DEFAULT_INITIAL_LEVEL = -1;
+
+ private int level;
LevelCountingBsonWriter(final BsonWriter bsonWriter) {
+ this(bsonWriter, DEFAULT_INITIAL_LEVEL);
+ }
+
+ /**
+ * @param initialLevel This parameter allows initializing the {@linkplain #getCurrentLevel() current level}
+ * with a value different from {@link #DEFAULT_INITIAL_LEVEL}.
+ */
+ LevelCountingBsonWriter(final BsonWriter bsonWriter, final int initialLevel) {
super(bsonWriter);
+ level = initialLevel;
}
int getCurrentLevel() {
diff --git a/driver-core/src/main/com/mongodb/internal/connection/MessageSequences.java b/driver-core/src/main/com/mongodb/internal/connection/MessageSequences.java
new file mode 100644
index 00000000000..19600007404
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/connection/MessageSequences.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal.connection;
+
+/**
+ * Zero or more identifiable sequences contained in the {@code OP_MSG} section with payload type 1.
+ *
+ * This class is not part of the public API and may be removed or changed at any time.
+ * @see OP_MSG
+ */
+public abstract class MessageSequences {
+ public static final class EmptyMessageSequences extends MessageSequences {
+ public static final EmptyMessageSequences INSTANCE = new EmptyMessageSequences();
+
+ private EmptyMessageSequences() {
+ }
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java b/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java
index 7a5734bc140..91fd862ce4b 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java
@@ -42,6 +42,13 @@ public final class MessageSettings {
* {@code maxWriteBatchSize}.
*/
private static final int DEFAULT_MAX_BATCH_COUNT = 1000;
+ /**
+ * The headroom for documents that are not intended to be stored in a database.
+ * A command document is an example of such a document.
+ * This headroom allows a command document to specify a document that is intended to be stored in a database,
+ * even if the specified document is of the maximum size.
+ */
+ static final int DOCUMENT_HEADROOM_SIZE = 16 * (1 << 10);
private final int maxDocumentSize;
private final int maxMessageSize;
diff --git a/driver-core/src/main/com/mongodb/internal/connection/RequestMessage.java b/driver-core/src/main/com/mongodb/internal/connection/RequestMessage.java
index 86e2ebd1dbe..dd09a59f763 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/RequestMessage.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/RequestMessage.java
@@ -18,24 +18,16 @@
import com.mongodb.lang.Nullable;
import org.bson.BsonBinaryWriter;
-import org.bson.BsonBinaryWriterSettings;
import org.bson.BsonDocument;
-import org.bson.BsonElement;
-import org.bson.BsonWriter;
-import org.bson.BsonWriterSettings;
import org.bson.FieldNameValidator;
-import org.bson.codecs.BsonValueCodecProvider;
-import org.bson.codecs.Codec;
-import org.bson.codecs.Encoder;
-import org.bson.codecs.EncoderContext;
-import org.bson.codecs.configuration.CodecRegistry;
import org.bson.io.BsonOutput;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static com.mongodb.assertions.Assertions.notNull;
-import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
+import static com.mongodb.internal.connection.BsonWriterHelper.backpatchLength;
+import static com.mongodb.internal.connection.BsonWriterHelper.createBsonBinaryWriter;
+import static com.mongodb.internal.connection.BsonWriterHelper.encodeUsingRegistry;
/**
* Abstract base class for all MongoDB Wire Protocol request messages.
@@ -46,12 +38,6 @@ abstract class RequestMessage {
static final int MESSAGE_PROLOGUE_LENGTH = 16;
- // Allow an extra 16K to the maximum allowed size of a query or command document, so that, for example,
- // a 16M document can be upserted via findAndModify
- private static final int DOCUMENT_HEADROOM = 16 * 1024;
-
- private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider());
-
private final String collectionName;
private final MessageSettings settings;
private final int id;
@@ -128,12 +114,12 @@ public MessageSettings getSettings() {
* @param bsonOutput the output
* @param operationContext the session context
*/
- public void encode(final BsonOutput bsonOutput, final OperationContext operationContext) {
+ public void encode(final ByteBufferBsonOutput bsonOutput, final OperationContext operationContext) {
notNull("operationContext", operationContext);
int messageStartPosition = bsonOutput.getPosition();
writeMessagePrologue(bsonOutput);
EncodingMetadata encodingMetadata = encodeMessageBodyWithMetadata(bsonOutput, operationContext);
- backpatchMessageLength(messageStartPosition, bsonOutput);
+ backpatchLength(messageStartPosition, bsonOutput);
this.encodingMetadata = encodingMetadata;
}
@@ -165,23 +151,13 @@ protected void writeMessagePrologue(final BsonOutput bsonOutput) {
* @param operationContext the session context
* @return the encoding metadata
*/
- protected abstract EncodingMetadata encodeMessageBodyWithMetadata(BsonOutput bsonOutput, OperationContext operationContext);
-
- protected void addDocument(final BsonDocument document, final BsonOutput bsonOutput,
- final FieldNameValidator validator, @Nullable final List extraElements) {
- addDocument(document, getCodec(document), EncoderContext.builder().build(), bsonOutput, validator,
- settings.getMaxDocumentSize() + DOCUMENT_HEADROOM, extraElements);
- }
+ protected abstract EncodingMetadata encodeMessageBodyWithMetadata(ByteBufferBsonOutput bsonOutput, OperationContext operationContext);
- /**
- * Backpatches the message length into the beginning of the message.
- *
- * @param startPosition the start position of the message
- * @param bsonOutput the output
- */
- protected void backpatchMessageLength(final int startPosition, final BsonOutput bsonOutput) {
- int messageLength = bsonOutput.getPosition() - startPosition;
- bsonOutput.writeInt32(bsonOutput.getPosition() - messageLength, messageLength);
+ protected int writeDocument(final BsonDocument document, final BsonOutput bsonOutput, final FieldNameValidator validator) {
+ BsonBinaryWriter writer = createBsonBinaryWriter(bsonOutput, validator, getSettings());
+ int documentStart = bsonOutput.getPosition();
+ encodeUsingRegistry(writer, document);
+ return bsonOutput.getPosition() - documentStart;
}
/**
@@ -192,20 +168,4 @@ protected void backpatchMessageLength(final int startPosition, final BsonOutput
protected String getCollectionName() {
return collectionName;
}
-
- @SuppressWarnings("unchecked")
- Codec getCodec(final BsonDocument document) {
- return (Codec) REGISTRY.get(document.getClass());
- }
-
- private void addDocument(final T obj, final Encoder encoder, final EncoderContext encoderContext,
- final BsonOutput bsonOutput, final FieldNameValidator validator, final int maxDocumentSize,
- @Nullable final List extraElements) {
- BsonBinaryWriter bsonBinaryWriter = new BsonBinaryWriter(new BsonWriterSettings(), new BsonBinaryWriterSettings(maxDocumentSize),
- bsonOutput, validator);
- BsonWriter bsonWriter = extraElements == null
- ? bsonBinaryWriter
- : new ElementExtendingBsonWriter(bsonBinaryWriter, extraElements);
- encoder.encode(bsonWriter, obj, encoderContext);
- }
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java b/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java
index 55bbac03b8b..9e52894f720 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/SplittablePayload.java
@@ -26,6 +26,7 @@
import org.bson.BsonObjectId;
import org.bson.BsonValue;
import org.bson.BsonWriter;
+import org.bson.FieldNameValidator;
import org.bson.codecs.BsonValueCodecProvider;
import org.bson.codecs.Codec;
import org.bson.codecs.Encoder;
@@ -54,8 +55,9 @@
*
* This class is not part of the public API and may be removed or changed at any time
*/
-public final class SplittablePayload {
+public final class SplittablePayload extends MessageSequences {
private static final CodecRegistry REGISTRY = fromProviders(new BsonValueCodecProvider());
+ private final FieldNameValidator fieldNameValidator;
private final WriteRequestEncoder writeRequestEncoder = new WriteRequestEncoder();
private final Type payloadType;
private final List writeRequestWithIndexes;
@@ -94,10 +96,19 @@ public enum Type {
* @param payloadType the payload type
* @param writeRequestWithIndexes the writeRequests
*/
- public SplittablePayload(final Type payloadType, final List writeRequestWithIndexes, final boolean ordered) {
+ public SplittablePayload(
+ final Type payloadType,
+ final List writeRequestWithIndexes,
+ final boolean ordered,
+ final FieldNameValidator fieldNameValidator) {
this.payloadType = notNull("batchType", payloadType);
this.writeRequestWithIndexes = notNull("writeRequests", writeRequestWithIndexes);
this.ordered = ordered;
+ this.fieldNameValidator = notNull("fieldNameValidator", fieldNameValidator);
+ }
+
+ public FieldNameValidator getFieldNameValidator() {
+ return fieldNameValidator;
}
/**
@@ -175,7 +186,7 @@ boolean isOrdered() {
public SplittablePayload getNextSplit() {
isTrue("hasAnotherSplit", hasAnotherSplit());
List nextPayLoad = writeRequestWithIndexes.subList(position, writeRequestWithIndexes.size());
- return new SplittablePayload(payloadType, nextPayLoad, ordered);
+ return new SplittablePayload(payloadType, nextPayLoad, ordered, fieldNameValidator);
}
/**
@@ -204,7 +215,8 @@ public void encode(final BsonWriter writer, final WriteRequestWithIndex writeReq
writer,
// Reuse `writeRequestDocumentId` if it may have been generated
// by `IdHoldingBsonWriter` in a previous attempt.
- // If its type is not `BsonObjectId`, we know it could not have been generated.
+ // If its type is not `BsonObjectId`, which happens only if `_id` was specified by the application,
+ // we know it could not have been generated.
writeRequestDocumentId instanceof BsonObjectId ? writeRequestDocumentId.asObjectId() : null);
getCodec(document).encode(idHoldingBsonWriter, document,
EncoderContext.builder().isEncodingCollectibleDocument(true).build());
diff --git a/driver-core/src/main/com/mongodb/internal/connection/SplittablePayloadBsonWriter.java b/driver-core/src/main/com/mongodb/internal/connection/SplittablePayloadBsonWriter.java
index ecff2c95a0c..e679a3b557c 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/SplittablePayloadBsonWriter.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/SplittablePayloadBsonWriter.java
@@ -63,7 +63,7 @@ public void writeStartDocument() {
@Override
public void writeEndDocument() {
- if (getCurrentLevel() == 0 && payload.hasPayload()) {
+ if (getCurrentLevel() == DEFAULT_INITIAL_LEVEL + 1 && payload.hasPayload()) {
writePayloadArray(writer, bsonOutput, settings, messageStartPosition, payload, maxSplittableDocumentSize);
}
super.writeEndDocument();
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
index b3781fc66ff..f158b3944ae 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
@@ -322,7 +322,7 @@ static AsyncCallbackSupplier decorateReadWithRetriesAsync(final RetryStat
static AsyncCallbackSupplier decorateWriteWithRetriesAsync(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier asyncWriteFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext),
- CommandOperationHelper::shouldAttemptToRetryWrite, callback -> {
+ CommandOperationHelper::loggingShouldAttemptToRetryWriteAndAddRetryableLabel, callback -> {
logRetryExecute(retryState, operationContext);
asyncWriteFunction.get(callback);
});
@@ -344,7 +344,7 @@ static CommandReadTransformerAsync> asyncS
}
static AsyncBatchCursor cursorDocumentToAsyncBatchCursor(final TimeoutMode timeoutMode, final BsonDocument cursorDocument,
- final int batchSize, final Decoder decoder, final BsonValue comment, final AsyncConnectionSource source,
+ final int batchSize, final Decoder decoder, @Nullable final BsonValue comment, final AsyncConnectionSource source,
final AsyncConnection connection) {
return new AsyncCommandBatchCursor<>(timeoutMode, cursorDocument, batchSize, 0, decoder, comment, source, connection);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
index 70c847668ab..a70af7c64fd 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
@@ -44,6 +44,9 @@
import com.mongodb.client.model.SearchIndexModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.TimeoutSettings;
@@ -294,6 +297,12 @@ public AsyncWriteOperation bulkWrite(final List extends Write
return operations.bulkWrite(requests, options);
}
+ public AsyncWriteOperation clientBulkWriteOperation(
+ final List extends ClientNamespacedWriteModel> clientWriteModels,
+ @Nullable final ClientBulkWriteOptions options) {
+ return operations.clientBulkWriteOperation(clientWriteModels, options);
+ }
+
public AsyncReadOperation commandRead(final Bson command, final Class resultClass) {
return operations.commandRead(command, resultClass);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java
index 5f86eb1f8fb..1463798ef64 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java
@@ -25,6 +25,12 @@
import java.util.Iterator;
import java.util.List;
+import static java.util.Spliterator.IMMUTABLE;
+import static java.util.Spliterator.ORDERED;
+import static java.util.Spliterators.spliteratorUnknownSize;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.StreamSupport.stream;
+
/**
* MongoDB returns query results as batches, and this interface provideds an iterator over those batches. The first call to
* the {@code next} method will return the first batch, and subsequent calls will trigger a request to get the next batch
@@ -98,4 +104,9 @@ public interface BatchCursor extends Iterator>, Closeable {
ServerCursor getServerCursor();
ServerAddress getServerAddress();
+
+ default List> exhaust() {
+ return stream(spliteratorUnknownSize(this, ORDERED | IMMUTABLE), false)
+ .collect(toList());
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java b/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java
index 1bca4734eff..1064bee14d3 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java
@@ -64,7 +64,7 @@
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE;
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
-import static com.mongodb.internal.operation.MixedBulkWriteOperation.commandWriteConcern;
+import static com.mongodb.internal.operation.CommandOperationHelper.commandWriteConcern;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError;
@@ -111,7 +111,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
}
if (canRetryWrites && !writeRequestsAreRetryable) {
canRetryWrites = false;
- LOGGER.debug("retryWrites set but one or more writeRequests do not support retryable writes");
+ logWriteModelDoesNotSupportRetries();
}
return new BulkWriteBatch(namespace, connectionDescription, ordered, writeConcern, bypassDocumentValidation,
canRetryWrites, new BulkWriteBatchCombiner(connectionDescription.getServerAddress(), ordered, writeConcern),
@@ -154,7 +154,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
this.indexMap = indexMap;
this.unprocessed = unprocessedItems;
- this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered);
+ this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems, ordered, getFieldNameValidator());
this.operationContext = operationContext;
this.comment = comment;
this.variables = variables;
@@ -270,7 +270,7 @@ BulkWriteBatch getNextBatch() {
}
}
- FieldNameValidator getFieldNameValidator() {
+ private FieldNameValidator getFieldNameValidator() {
if (batchType == UPDATE || batchType == REPLACE) {
Map rootMap;
if (batchType == REPLACE) {
@@ -385,4 +385,8 @@ private static boolean isRetryable(final WriteRequest writeRequest) {
}
return true;
}
+
+ static void logWriteModelDoesNotSupportRetries() {
+ LOGGER.debug("retryWrites set but one or more writeRequests do not support retryable writes");
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java
new file mode 100644
index 00000000000..ccd7f272e95
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java
@@ -0,0 +1,1338 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal.operation;
+
+import com.mongodb.ClientBulkWriteException;
+import com.mongodb.MongoClientException;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoServerException;
+import com.mongodb.MongoSocketException;
+import com.mongodb.MongoWriteConcernException;
+import com.mongodb.ServerAddress;
+import com.mongodb.WriteConcern;
+import com.mongodb.WriteError;
+import com.mongodb.assertions.Assertions;
+import com.mongodb.bulk.WriteConcernError;
+import com.mongodb.client.cursor.TimeoutMode;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
+import com.mongodb.client.model.bulk.ClientDeleteResult;
+import com.mongodb.client.model.bulk.ClientInsertOneResult;
+import com.mongodb.client.model.bulk.ClientNamespacedReplaceOneModel;
+import com.mongodb.client.model.bulk.ClientNamespacedUpdateOneModel;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
+import com.mongodb.client.model.bulk.ClientUpdateResult;
+import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.TimeoutContext;
+import com.mongodb.internal.VisibleForTesting;
+import com.mongodb.internal.async.AsyncBatchCursor;
+import com.mongodb.internal.async.AsyncSupplier;
+import com.mongodb.internal.async.MutableValue;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.async.function.AsyncCallbackSupplier;
+import com.mongodb.internal.async.function.RetryState;
+import com.mongodb.internal.binding.AsyncConnectionSource;
+import com.mongodb.internal.binding.AsyncWriteBinding;
+import com.mongodb.internal.binding.ConnectionSource;
+import com.mongodb.internal.binding.WriteBinding;
+import com.mongodb.internal.client.model.bulk.AbstractClientDeleteModel;
+import com.mongodb.internal.client.model.bulk.AbstractClientDeleteOptions;
+import com.mongodb.internal.client.model.bulk.AbstractClientNamespacedWriteModel;
+import com.mongodb.internal.client.model.bulk.AbstractClientUpdateModel;
+import com.mongodb.internal.client.model.bulk.AbstractClientUpdateOptions;
+import com.mongodb.internal.client.model.bulk.AcknowledgedSummaryClientBulkWriteResult;
+import com.mongodb.internal.client.model.bulk.AcknowledgedVerboseClientBulkWriteResult;
+import com.mongodb.internal.client.model.bulk.ClientWriteModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientBulkWriteOptions;
+import com.mongodb.internal.client.model.bulk.ConcreteClientDeleteManyModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientDeleteOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientDeleteResult;
+import com.mongodb.internal.client.model.bulk.ConcreteClientInsertOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientInsertOneResult;
+import com.mongodb.internal.client.model.bulk.ConcreteClientNamespacedDeleteManyModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientNamespacedDeleteOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientNamespacedInsertOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientNamespacedReplaceOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientNamespacedUpdateManyModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientNamespacedUpdateOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientReplaceOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientReplaceOneOptions;
+import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateManyModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateOneModel;
+import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateResult;
+import com.mongodb.internal.client.model.bulk.UnacknowledgedClientBulkWriteResult;
+import com.mongodb.internal.connection.AsyncConnection;
+import com.mongodb.internal.connection.Connection;
+import com.mongodb.internal.connection.DualMessageSequences;
+import com.mongodb.internal.connection.IdHoldingBsonWriter;
+import com.mongodb.internal.connection.MongoWriteConcernWithResponseException;
+import com.mongodb.internal.connection.OperationContext;
+import com.mongodb.internal.operation.retry.AttachmentKeys;
+import com.mongodb.internal.session.SessionContext;
+import com.mongodb.internal.validator.NoOpFieldNameValidator;
+import com.mongodb.internal.validator.ReplacingDocumentFieldNameValidator;
+import com.mongodb.internal.validator.UpdateFieldNameValidator;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonArray;
+import org.bson.BsonBinaryWriter;
+import org.bson.BsonBoolean;
+import org.bson.BsonDocument;
+import org.bson.BsonElement;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonObjectId;
+import org.bson.BsonValue;
+import org.bson.BsonWriter;
+import org.bson.FieldNameValidator;
+import org.bson.codecs.Encoder;
+import org.bson.codecs.EncoderContext;
+import org.bson.codecs.configuration.CodecRegistry;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+import static com.mongodb.assertions.Assertions.assertFalse;
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.fail;
+import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE;
+import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
+import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
+import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED;
+import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED;
+import static com.mongodb.internal.operation.AsyncOperationHelper.cursorDocumentToAsyncBatchCursor;
+import static com.mongodb.internal.operation.AsyncOperationHelper.decorateWriteWithRetriesAsync;
+import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection;
+import static com.mongodb.internal.operation.BulkWriteBatch.logWriteModelDoesNotSupportRetries;
+import static com.mongodb.internal.operation.CommandOperationHelper.commandWriteConcern;
+import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
+import static com.mongodb.internal.operation.CommandOperationHelper.shouldAttemptToRetryWriteAndAddRetryableLabel;
+import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException;
+import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
+import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor;
+import static com.mongodb.internal.operation.SyncOperationHelper.decorateWriteWithRetries;
+import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static java.util.Optional.ofNullable;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * This class is not part of the public API and may be removed or changed at any time.
+ */
+public final class ClientBulkWriteOperation implements WriteOperation, AsyncWriteOperation {
+ private static final ConcreteClientBulkWriteOptions EMPTY_OPTIONS = new ConcreteClientBulkWriteOptions();
+ private static final String BULK_WRITE_COMMAND_NAME = "bulkWrite";
+ private static final EncoderContext DEFAULT_ENCODER_CONTEXT = EncoderContext.builder().build();
+ private static final EncoderContext COLLECTIBLE_DOCUMENT_ENCODER_CONTEXT = EncoderContext.builder()
+ .isEncodingCollectibleDocument(true).build();
+ private static final int INITIAL_BATCH_MODEL_START_INDEX = 0;
+ private static final int SERVER_DEFAULT_CURSOR_BATCH_SIZE = 0;
+
+ private final List extends ClientNamespacedWriteModel> models;
+ private final ConcreteClientBulkWriteOptions options;
+ private final WriteConcern writeConcernSetting;
+ private final boolean retryWritesSetting;
+ private final CodecRegistry codecRegistry;
+
+ /**
+ * @param retryWritesSetting See {@link MongoClientSettings#getRetryWrites()}.
+ */
+ public ClientBulkWriteOperation(
+ final List extends ClientNamespacedWriteModel> models,
+ @Nullable final ClientBulkWriteOptions options,
+ final WriteConcern writeConcernSetting,
+ final boolean retryWritesSetting,
+ final CodecRegistry codecRegistry) {
+ this.models = models;
+ this.options = options == null ? EMPTY_OPTIONS : (ConcreteClientBulkWriteOptions) options;
+ this.writeConcernSetting = writeConcernSetting;
+ this.retryWritesSetting = retryWritesSetting;
+ this.codecRegistry = codecRegistry;
+ }
+
+ @Override
+ public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBulkWriteException {
+ WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext());
+ ResultAccumulator resultAccumulator = new ResultAccumulator();
+ MongoException transformedTopLevelError = null;
+
+ try {
+ executeAllBatches(effectiveWriteConcern, binding, resultAccumulator);
+ } catch (MongoException topLevelError) {
+ transformedTopLevelError = transformWriteException(topLevelError);
+ }
+ return resultAccumulator.build(transformedTopLevelError, effectiveWriteConcern);
+ }
+
+
+ @Override
+ public void executeAsync(final AsyncWriteBinding binding,
+ final SingleResultCallback finalCallback) {
+ WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext());
+ ResultAccumulator resultAccumulator = new ResultAccumulator();
+ MutableValue transformedTopLevelError = new MutableValue<>();
+
+ beginAsync().thenSupply(c -> {
+ executeAllBatchesAsync(effectiveWriteConcern, binding, resultAccumulator, c);
+ }).onErrorIf(topLevelError -> topLevelError instanceof MongoException, (topLevelError, c) -> {
+ transformedTopLevelError.set(transformWriteException((MongoException) topLevelError));
+ c.complete(c);
+ }).thenApply((ignored, c) -> {
+ c.complete(resultAccumulator.build(transformedTopLevelError.getNullable(), effectiveWriteConcern));
+ }).finish(finalCallback);
+ }
+
+ /**
+ * To execute a batch means:
+ *
+ * - execute a `bulkWrite` command, which creates a cursor;
+ * - consume the cursor, which may involve executing `getMore` commands.
+ *
+ *
+ * @throws MongoException When a {@linkplain ClientBulkWriteException#getCause() top-level error} happens.
+ */
+ private void executeAllBatches(
+ final WriteConcern effectiveWriteConcern,
+ final WriteBinding binding,
+ final ResultAccumulator resultAccumulator) throws MongoException {
+ Integer nextBatchStartModelIndex = INITIAL_BATCH_MODEL_START_INDEX;
+
+ do {
+ nextBatchStartModelIndex = executeBatch(nextBatchStartModelIndex, effectiveWriteConcern, binding, resultAccumulator);
+ } while (nextBatchStartModelIndex != null);
+ }
+
+ /**
+ * @see #executeAllBatches(WriteConcern, WriteBinding, ResultAccumulator)
+ */
+ private void executeAllBatchesAsync(
+ final WriteConcern effectiveWriteConcern,
+ final AsyncWriteBinding binding,
+ final ResultAccumulator resultAccumulator,
+ final SingleResultCallback finalCallback) {
+ MutableValue nextBatchStartModelIndex = new MutableValue<>(INITIAL_BATCH_MODEL_START_INDEX);
+
+ beginAsync().thenRunDoWhileLoop(iterationCallback -> {
+ beginAsync().thenSupply(c -> {
+ executeBatchAsync(nextBatchStartModelIndex.get(), effectiveWriteConcern, binding, resultAccumulator, c);
+ }).thenApply((nextBatchStartModelIdx, c) -> {
+ nextBatchStartModelIndex.set(nextBatchStartModelIdx);
+ c.complete(c);
+ }).finish(iterationCallback);
+ }, () -> nextBatchStartModelIndex.getNullable() != null
+ ).finish(finalCallback);
+ }
+
+ /**
+ * @return The start model index of the next batch, provided that the operation
+ * {@linkplain ExhaustiveClientBulkWriteCommandOkResponse#operationMayContinue(ConcreteClientBulkWriteOptions) may continue}
+ * and there are unexecuted {@linkplain ClientNamespacedWriteModel models} left.
+ */
+ @Nullable
+ private Integer executeBatch(
+ final int batchStartModelIndex,
+ final WriteConcern effectiveWriteConcern,
+ final WriteBinding binding,
+ final ResultAccumulator resultAccumulator) {
+ List extends ClientNamespacedWriteModel> unexecutedModels = models.subList(batchStartModelIndex, models.size());
+ assertFalse(unexecutedModels.isEmpty());
+ OperationContext operationContext = binding.getOperationContext();
+ SessionContext sessionContext = operationContext.getSessionContext();
+ TimeoutContext timeoutContext = operationContext.getTimeoutContext();
+ RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext);
+ BatchEncoder batchEncoder = new BatchEncoder();
+
+ Supplier retryingBatchExecutor = decorateWriteWithRetries(
+ retryState, operationContext,
+ // Each batch re-selects a server and re-checks out a connection because this is simpler,
+ // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502.
+ // If connection pinning is required, `binding` handles that,
+ // and `ClientSession`, `TransactionContext` are aware of that.
+ () -> withSourceAndConnection(binding::getWriteConnectionSource, true,
+ (connectionSource, connection) -> {
+ ConnectionDescription connectionDescription = connection.getDescription();
+ boolean effectiveRetryWrites = isRetryableWrite(
+ retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext);
+ retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites);
+ resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress());
+ retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true)
+ .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false);
+ ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand(
+ retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder,
+ () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true));
+ return executeBulkWriteCommandAndExhaustOkResponse(
+ retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext);
+ })
+ );
+
+ try {
+ ExhaustiveClientBulkWriteCommandOkResponse bulkWriteCommandOkResponse = retryingBatchExecutor.get();
+ return resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse(
+ batchStartModelIndex, bulkWriteCommandOkResponse, batchEncoder.intoEncodedBatchInfo());
+ } catch (MongoWriteConcernWithResponseException mongoWriteConcernWithOkResponseException) {
+ return resultAccumulator.onBulkWriteCommandOkResponseWithWriteConcernError(
+ batchStartModelIndex, mongoWriteConcernWithOkResponseException, batchEncoder.intoEncodedBatchInfo());
+ } catch (MongoCommandException bulkWriteCommandException) {
+ resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException);
+ throw bulkWriteCommandException;
+ } catch (MongoException mongoException) {
+ // The server does not have a chance to add "RetryableWriteError" label to `e`,
+ // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance
+ // to add the label. So we do that explicitly.
+ shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, mongoException);
+ resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException);
+ throw mongoException;
+ }
+ }
+
+ /**
+ * @see #executeBatch(int, WriteConcern, WriteBinding, ResultAccumulator)
+ */
+ private void executeBatchAsync(
+ final int batchStartModelIndex,
+ final WriteConcern effectiveWriteConcern,
+ final AsyncWriteBinding binding,
+ final ResultAccumulator resultAccumulator,
+ final SingleResultCallback finalCallback) {
+ List extends ClientNamespacedWriteModel> unexecutedModels = models.subList(batchStartModelIndex, models.size());
+ assertFalse(unexecutedModels.isEmpty());
+ OperationContext operationContext = binding.getOperationContext();
+ SessionContext sessionContext = operationContext.getSessionContext();
+ TimeoutContext timeoutContext = operationContext.getTimeoutContext();
+ RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext);
+ BatchEncoder batchEncoder = new BatchEncoder();
+
+ AsyncCallbackSupplier retryingBatchExecutor = decorateWriteWithRetriesAsync(
+ retryState, operationContext,
+ // Each batch re-selects a server and re-checks out a connection because this is simpler,
+ // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502.
+ // If connection pinning is required, `binding` handles that,
+ // and `ClientSession`, `TransactionContext` are aware of that.
+ funcCallback -> withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback,
+ (connectionSource, connection, resultCallback) -> {
+ ConnectionDescription connectionDescription = connection.getDescription();
+ boolean effectiveRetryWrites = isRetryableWrite(
+ retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext);
+ retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites);
+ resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress());
+ retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true)
+ .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false);
+ ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand(
+ retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder,
+ () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true));
+ executeBulkWriteCommandAndExhaustOkResponseAsync(
+ retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext, resultCallback);
+ })
+ );
+
+ beginAsync().thenSupply(callback -> {
+ retryingBatchExecutor.get(callback);
+ }).thenApply((bulkWriteCommandOkResponse, callback) -> {
+ callback.complete(resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse(
+ batchStartModelIndex, bulkWriteCommandOkResponse, batchEncoder.intoEncodedBatchInfo()));
+ }).onErrorIf(throwable -> true, (t, callback) -> {
+ if (t instanceof MongoWriteConcernWithResponseException) {
+ MongoWriteConcernWithResponseException mongoWriteConcernWithOkResponseException = (MongoWriteConcernWithResponseException) t;
+ callback.complete(resultAccumulator.onBulkWriteCommandOkResponseWithWriteConcernError(
+ batchStartModelIndex, mongoWriteConcernWithOkResponseException, batchEncoder.intoEncodedBatchInfo()));
+ } else if (t instanceof MongoCommandException) {
+ MongoCommandException bulkWriteCommandException = (MongoCommandException) t;
+ resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException);
+ callback.completeExceptionally(t);
+ } else if (t instanceof MongoException) {
+ MongoException mongoException = (MongoException) t;
+ // The server does not have a chance to add "RetryableWriteError" label to `e`,
+ // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance
+ // to add the label. So we do that explicitly.
+ shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, mongoException);
+ resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException);
+ callback.completeExceptionally(mongoException);
+ } else {
+ callback.completeExceptionally(t);
+ }
+ }).finish(finalCallback);
+ }
+
+ /**
+ * @throws MongoWriteConcernWithResponseException This internal exception must be handled to avoid it being observed by an application.
+ * It {@linkplain MongoWriteConcernWithResponseException#getResponse() bears} the OK response to the {@code bulkWriteCommand},
+ * which must be
+ * {@linkplain ResultAccumulator#onBulkWriteCommandOkResponseWithWriteConcernError(int, MongoWriteConcernWithResponseException, BatchEncoder.EncodedBatchInfo) accumulated}
+ * iff this exception is the failed result of retries.
+ */
+ @Nullable
+ private ExhaustiveClientBulkWriteCommandOkResponse executeBulkWriteCommandAndExhaustOkResponse(
+ final RetryState retryState,
+ final ConnectionSource connectionSource,
+ final Connection connection,
+ final ClientBulkWriteCommand bulkWriteCommand,
+ final WriteConcern effectiveWriteConcern,
+ final OperationContext operationContext) throws MongoWriteConcernWithResponseException {
+ BsonDocument bulkWriteCommandOkResponse = connection.command(
+ "admin",
+ bulkWriteCommand.getCommandDocument(),
+ NoOpFieldNameValidator.INSTANCE,
+ null,
+ CommandResultDocumentCodec.create(codecRegistry.get(BsonDocument.class), CommandBatchCursorHelper.FIRST_BATCH),
+ operationContext,
+ effectiveWriteConcern.isAcknowledged(),
+ bulkWriteCommand.getOpsAndNsInfo());
+ if (bulkWriteCommandOkResponse == null) {
+ return null;
+ }
+ List> cursorExhaustBatches = doWithRetriesDisabledForCommand(retryState, "getMore", () ->
+ exhaustBulkWriteCommandOkResponseCursor(connectionSource, connection, bulkWriteCommandOkResponse));
+ return createExhaustiveClientBulkWriteCommandOkResponse(
+ bulkWriteCommandOkResponse,
+ cursorExhaustBatches,
+ connection.getDescription());
+ }
+
+ /**
+ * @see #executeBulkWriteCommandAndExhaustOkResponse(RetryState, ConnectionSource, Connection, ClientBulkWriteCommand, WriteConcern, OperationContext)
+ */
+ private void executeBulkWriteCommandAndExhaustOkResponseAsync(
+ final RetryState retryState,
+ final AsyncConnectionSource connectionSource,
+ final AsyncConnection connection,
+ final ClientBulkWriteCommand bulkWriteCommand,
+ final WriteConcern effectiveWriteConcern,
+ final OperationContext operationContext,
+ final SingleResultCallback finalCallback) {
+ beginAsync().thenSupply(callback -> {
+ connection.commandAsync(
+ "admin",
+ bulkWriteCommand.getCommandDocument(),
+ NoOpFieldNameValidator.INSTANCE,
+ null,
+ CommandResultDocumentCodec.create(codecRegistry.get(BsonDocument.class), CommandBatchCursorHelper.FIRST_BATCH),
+ operationContext,
+ effectiveWriteConcern.isAcknowledged(),
+ bulkWriteCommand.getOpsAndNsInfo(), callback);
+ }).thenApply((bulkWriteCommandOkResponse, callback) -> {
+ if (bulkWriteCommandOkResponse == null) {
+ callback.complete((ExhaustiveClientBulkWriteCommandOkResponse) null);
+ return;
+ }
+ beginAsync().>>thenSupply(c -> {
+ doWithRetriesDisabledForCommandAsync(retryState, "getMore", (c1) -> {
+ exhaustBulkWriteCommandOkResponseCursorAsync(connectionSource, connection, bulkWriteCommandOkResponse, c1);
+ }, c);
+ }).thenApply((cursorExhaustBatches, c) -> {
+ c.complete(createExhaustiveClientBulkWriteCommandOkResponse(
+ bulkWriteCommandOkResponse,
+ cursorExhaustBatches,
+ connection.getDescription()));
+ }).finish(callback);
+ }).finish(finalCallback);
+ }
+
+ private static ExhaustiveClientBulkWriteCommandOkResponse createExhaustiveClientBulkWriteCommandOkResponse(
+ final BsonDocument bulkWriteCommandOkResponse, final List> cursorExhaustBatches,
+ final ConnectionDescription connectionDescription) {
+ ExhaustiveClientBulkWriteCommandOkResponse exhaustiveBulkWriteCommandOkResponse =
+ new ExhaustiveClientBulkWriteCommandOkResponse(
+ bulkWriteCommandOkResponse, cursorExhaustBatches);
+
+ // `Connection.command` does not throw `MongoWriteConcernException`, so we have to construct it ourselves
+ MongoWriteConcernException writeConcernException = Exceptions.createWriteConcernException(
+ bulkWriteCommandOkResponse, connectionDescription.getServerAddress());
+ if (writeConcernException != null) {
+ throw new MongoWriteConcernWithResponseException(writeConcernException, exhaustiveBulkWriteCommandOkResponse);
+ }
+ return exhaustiveBulkWriteCommandOkResponse;
+ }
+
+ private R doWithRetriesDisabledForCommand(
+ final RetryState retryState,
+ final String commandDescription,
+ final Supplier actionWithCommand) {
+ Optional originalRetryableCommandFlag = retryState.attachment(AttachmentKeys.retryableCommandFlag());
+ Supplier originalCommandDescriptionSupplier = retryState.attachment(AttachmentKeys.commandDescriptionSupplier())
+ .orElseThrow(Assertions::fail);
+
+ try {
+ retryState.attach(AttachmentKeys.retryableCommandFlag(), false, true)
+ .attach(AttachmentKeys.commandDescriptionSupplier(), () -> commandDescription, false);
+ return actionWithCommand.get();
+ } finally {
+ originalRetryableCommandFlag.ifPresent(value -> retryState.attach(AttachmentKeys.retryableCommandFlag(), value, true));
+ retryState.attach(AttachmentKeys.commandDescriptionSupplier(), originalCommandDescriptionSupplier, false);
+ }
+ }
+
+ private void doWithRetriesDisabledForCommandAsync(
+ final RetryState retryState,
+ final String commandDescription,
+ final AsyncSupplier actionWithCommand,
+ final SingleResultCallback finalCallback) {
+ Optional originalRetryableCommandFlag = retryState.attachment(AttachmentKeys.retryableCommandFlag());
+ Supplier originalCommandDescriptionSupplier = retryState.attachment(AttachmentKeys.commandDescriptionSupplier())
+ .orElseThrow(Assertions::fail);
+
+ beginAsync().thenSupply(c -> {
+ retryState.attach(AttachmentKeys.retryableCommandFlag(), false, true)
+ .attach(AttachmentKeys.commandDescriptionSupplier(), () -> commandDescription, false);
+ actionWithCommand.finish(c);
+ }).thenAlwaysRunAndFinish(() -> {
+ originalRetryableCommandFlag.ifPresent(value -> retryState.attach(AttachmentKeys.retryableCommandFlag(), value, true));
+ retryState.attach(AttachmentKeys.commandDescriptionSupplier(), originalCommandDescriptionSupplier, false);
+ }, finalCallback);
+ }
+
+ private List> exhaustBulkWriteCommandOkResponseCursor(
+ final ConnectionSource connectionSource,
+ final Connection connection,
+ final BsonDocument response) {
+ try (CommandBatchCursor cursor = cursorDocumentToBatchCursor(
+ TimeoutMode.CURSOR_LIFETIME,
+ response,
+ SERVER_DEFAULT_CURSOR_BATCH_SIZE,
+ codecRegistry.get(BsonDocument.class),
+ options.getComment().orElse(null),
+ connectionSource,
+ connection)) {
+
+ return cursor.exhaust();
+ }
+ }
+
+ private void exhaustBulkWriteCommandOkResponseCursorAsync(final AsyncConnectionSource connectionSource,
+ final AsyncConnection connection,
+ final BsonDocument bulkWriteCommandOkResponse,
+ final SingleResultCallback>> finalCallback) {
+ AsyncBatchCursor cursor = cursorDocumentToAsyncBatchCursor(
+ TimeoutMode.CURSOR_LIFETIME,
+ bulkWriteCommandOkResponse,
+ SERVER_DEFAULT_CURSOR_BATCH_SIZE,
+ codecRegistry.get(BsonDocument.class),
+ options.getComment().orElse(null),
+ connectionSource,
+ connection);
+
+ beginAsync().>>thenSupply(callback -> {
+ cursor.exhaust(callback);
+ }).thenAlwaysRunAndFinish(() -> {
+ cursor.close();
+ }, finalCallback);
+ }
+
+
+ private ClientBulkWriteCommand createBulkWriteCommand(
+ final RetryState retryState,
+ final boolean effectiveRetryWrites,
+ final WriteConcern effectiveWriteConcern,
+ final SessionContext sessionContext,
+ final List extends ClientNamespacedWriteModel> unexecutedModels,
+ final BatchEncoder batchEncoder,
+ final Runnable retriesEnabler) {
+ BsonDocument commandDocument = new BsonDocument(BULK_WRITE_COMMAND_NAME, new BsonInt32(1))
+ .append("errorsOnly", BsonBoolean.valueOf(!options.isVerboseResults()))
+ .append("ordered", BsonBoolean.valueOf(options.isOrdered()));
+ options.isBypassDocumentValidation().ifPresent(value ->
+ commandDocument.append("bypassDocumentValidation", BsonBoolean.valueOf(value)));
+ options.getComment().ifPresent(value ->
+ commandDocument.append("comment", value));
+ options.getLet().ifPresent(let ->
+ commandDocument.append("let", let.toBsonDocument(BsonDocument.class, codecRegistry)));
+ commandWriteConcern(effectiveWriteConcern, sessionContext).ifPresent(value->
+ commandDocument.append("writeConcern", value.asDocument()));
+ return new ClientBulkWriteCommand(
+ commandDocument,
+ new ClientBulkWriteCommand.OpsAndNsInfo(
+ effectiveRetryWrites, unexecutedModels,
+ batchEncoder,
+ options,
+ () -> {
+ retriesEnabler.run();
+ return retryState.isFirstAttempt()
+ ? sessionContext.advanceTransactionNumber()
+ : sessionContext.getTransactionNumber();
+ }));
+ }
+
+ private WriteConcern validateAndGetEffectiveWriteConcern(final SessionContext sessionContext) {
+ WriteConcern effectiveWriteConcern = CommandOperationHelper.validateAndGetEffectiveWriteConcern(writeConcernSetting, sessionContext);
+ if (!effectiveWriteConcern.isAcknowledged()) {
+ if (options.isVerboseResults()) {
+ throw new MongoClientException("Cannot request unacknowledged write concern and verbose results");
+ }
+ if (options.isOrdered()) {
+ throw new MongoClientException("Cannot request unacknowledged write concern and ordered writes");
+ }
+ }
+ return effectiveWriteConcern;
+ }
+
+ private void encodeUsingRegistry(final BsonWriter writer, final T value) {
+ encodeUsingRegistry(writer, value, DEFAULT_ENCODER_CONTEXT);
+ }
+
+ private void encodeUsingRegistry(final BsonWriter writer, final T value, final EncoderContext encoderContext) {
+ @SuppressWarnings("unchecked")
+ Encoder encoder = (Encoder) codecRegistry.get(value.getClass());
+ encoder.encode(writer, value, encoderContext);
+ }
+
+ private static AbstractClientNamespacedWriteModel getNamespacedModel(
+ final List extends ClientNamespacedWriteModel> models, final int index) {
+ return (AbstractClientNamespacedWriteModel) models.get(index);
+ }
+
+ public static final class Exceptions {
+ public static Optional serverAddressFromException(@Nullable final MongoException exception) {
+ ServerAddress serverAddress = null;
+ if (exception instanceof MongoServerException) {
+ serverAddress = ((MongoServerException) exception).getServerAddress();
+ } else if (exception instanceof MongoSocketException) {
+ serverAddress = ((MongoSocketException) exception).getServerAddress();
+ }
+ return ofNullable(serverAddress);
+ }
+
+ @Nullable
+ private static MongoWriteConcernException createWriteConcernException(
+ final BsonDocument response,
+ final ServerAddress serverAddress) {
+ final String writeConcernErrorFieldName = "writeConcernError";
+ if (!response.containsKey(writeConcernErrorFieldName)) {
+ return null;
+ }
+ BsonDocument writeConcernErrorDocument = response.getDocument(writeConcernErrorFieldName);
+ WriteConcernError writeConcernError = WriteConcernHelper.createWriteConcernError(writeConcernErrorDocument);
+ Set errorLabels = response.getArray("errorLabels", new BsonArray()).stream()
+ .map(i -> i.asString().getValue())
+ .collect(toSet());
+ return new MongoWriteConcernException(writeConcernError, null, serverAddress, errorLabels);
+ }
+ }
+
+ private static final class ExhaustiveClientBulkWriteCommandOkResponse {
+ /**
+ * The number of unsuccessful individual write operations.
+ */
+ private final int nErrors;
+ private final int nInserted;
+ private final int nUpserted;
+ private final int nMatched;
+ private final int nModified;
+ private final int nDeleted;
+ private final List cursorExhaust;
+
+ ExhaustiveClientBulkWriteCommandOkResponse(
+ final BsonDocument bulkWriteCommandOkResponse,
+ final List> cursorExhaustBatches) {
+ this.nErrors = bulkWriteCommandOkResponse.getInt32("nErrors").getValue();
+ this.nInserted = bulkWriteCommandOkResponse.getInt32("nInserted").getValue();
+ this.nUpserted = bulkWriteCommandOkResponse.getInt32("nUpserted").getValue();
+ this.nMatched = bulkWriteCommandOkResponse.getInt32("nMatched").getValue();
+ this.nModified = bulkWriteCommandOkResponse.getInt32("nModified").getValue();
+ this.nDeleted = bulkWriteCommandOkResponse.getInt32("nDeleted").getValue();
+ if (cursorExhaustBatches.isEmpty()) {
+ cursorExhaust = emptyList();
+ } else if (cursorExhaustBatches.size() == 1) {
+ cursorExhaust = cursorExhaustBatches.get(0);
+ } else {
+ cursorExhaust = cursorExhaustBatches.stream().flatMap(Collection::stream).collect(toList());
+ }
+ }
+
+ boolean operationMayContinue(final ConcreteClientBulkWriteOptions options) {
+ return nErrors == 0 || !options.isOrdered();
+ }
+
+ int getNErrors() {
+ return nErrors;
+ }
+
+ int getNInserted() {
+ return nInserted;
+ }
+
+ int getNUpserted() {
+ return nUpserted;
+ }
+
+ int getNMatched() {
+ return nMatched;
+ }
+
+ int getNModified() {
+ return nModified;
+ }
+
+ int getNDeleted() {
+ return nDeleted;
+ }
+
+ List getCursorExhaust() {
+ return cursorExhaust;
+ }
+ }
+
+ /**
+ * Accumulates results of the operation as it is being executed
+ * for {@linkplain #build(MongoException, WriteConcern) building} them when the operation completes.
+ */
+ private final class ResultAccumulator {
+ @Nullable
+ private ServerAddress serverAddress;
+ private final ArrayList batchResults;
+
+ ResultAccumulator() {
+ serverAddress = null;
+ batchResults = new ArrayList<>();
+ }
+
+ /**
+ *
+ * - Either builds and returns {@link ClientBulkWriteResult};
+ * - or builds and throws {@link ClientBulkWriteException};
+ * - or throws {@code topLevelError}.
+ *
+ */
+ ClientBulkWriteResult build(@Nullable final MongoException topLevelError, final WriteConcern effectiveWriteConcern) throws MongoException {
+ boolean verboseResultsSetting = options.isVerboseResults();
+ boolean batchResultsHaveResponses = false;
+ boolean batchResultsHaveInfoAboutSuccessfulIndividualOperations = false;
+ long insertedCount = 0;
+ long upsertedCount = 0;
+ long matchedCount = 0;
+ long modifiedCount = 0;
+ long deletedCount = 0;
+ Map insertResults = verboseResultsSetting ? new HashMap<>() : emptyMap();
+ Map updateResults = verboseResultsSetting ? new HashMap<>() : emptyMap();
+ Map deleteResults = verboseResultsSetting ? new HashMap<>() : emptyMap();
+ ArrayList writeConcernErrors = new ArrayList<>();
+ Map writeErrors = new HashMap<>();
+ for (BatchResult batchResult : batchResults) {
+ if (batchResult.hasResponse()) {
+ batchResultsHaveResponses = true;
+ MongoWriteConcernException writeConcernException = batchResult.getWriteConcernException();
+ if (writeConcernException != null) {
+ writeConcernErrors.add(writeConcernException.getWriteConcernError());
+ }
+ int batchStartModelIndex = batchResult.getBatchStartModelIndex();
+ ExhaustiveClientBulkWriteCommandOkResponse response = batchResult.getResponse();
+ boolean orderedSetting = options.isOrdered();
+ int nErrors = response.getNErrors();
+ batchResultsHaveInfoAboutSuccessfulIndividualOperations = batchResultsHaveInfoAboutSuccessfulIndividualOperations
+ || (orderedSetting && nErrors == 0)
+ || (!orderedSetting && nErrors < batchResult.getBatchModelsCount());
+ insertedCount += response.getNInserted();
+ upsertedCount += response.getNUpserted();
+ matchedCount += response.getNMatched();
+ modifiedCount += response.getNModified();
+ deletedCount += response.getNDeleted();
+ Map insertModelDocumentIds = batchResult.getInsertModelDocumentIds();
+ for (BsonDocument individualOperationResponse : response.getCursorExhaust()) {
+ int individualOperationIndexInBatch = individualOperationResponse.getInt32("idx").getValue();
+ int writeModelIndex = batchStartModelIndex + individualOperationIndexInBatch;
+ if (individualOperationResponse.getNumber("ok").intValue() == 1) {
+ assertTrue(verboseResultsSetting);
+ AbstractClientNamespacedWriteModel writeModel = getNamespacedModel(models, writeModelIndex);
+ if (writeModel instanceof ConcreteClientNamespacedInsertOneModel) {
+ insertResults.put(
+ writeModelIndex,
+ new ConcreteClientInsertOneResult(insertModelDocumentIds.get(individualOperationIndexInBatch)));
+ } else if (writeModel instanceof ConcreteClientNamespacedUpdateOneModel
+ || writeModel instanceof ConcreteClientNamespacedUpdateManyModel
+ || writeModel instanceof ConcreteClientNamespacedReplaceOneModel) {
+ BsonDocument upsertedIdDocument = individualOperationResponse.getDocument("upserted", null);
+ updateResults.put(
+ writeModelIndex,
+ new ConcreteClientUpdateResult(
+ individualOperationResponse.getInt32("n").getValue(),
+ individualOperationResponse.getInt32("nModified").getValue(),
+ upsertedIdDocument == null ? null : upsertedIdDocument.get("_id")));
+ } else if (writeModel instanceof ConcreteClientNamespacedDeleteOneModel
+ || writeModel instanceof ConcreteClientNamespacedDeleteManyModel) {
+ deleteResults.put(
+ writeModelIndex,
+ new ConcreteClientDeleteResult(individualOperationResponse.getInt32("n").getValue()));
+ } else {
+ fail(writeModel.getClass().toString());
+ }
+ } else {
+ batchResultsHaveInfoAboutSuccessfulIndividualOperations = batchResultsHaveInfoAboutSuccessfulIndividualOperations
+ || (orderedSetting && individualOperationIndexInBatch > 0);
+ WriteError individualOperationWriteError = new WriteError(
+ individualOperationResponse.getInt32("code").getValue(),
+ individualOperationResponse.getString("errmsg").getValue(),
+ individualOperationResponse.getDocument("errInfo", new BsonDocument()));
+ writeErrors.put(writeModelIndex, individualOperationWriteError);
+ }
+ }
+ }
+ }
+ if (topLevelError == null && writeConcernErrors.isEmpty() && writeErrors.isEmpty()) {
+ if (effectiveWriteConcern.isAcknowledged()) {
+ AcknowledgedSummaryClientBulkWriteResult summaryResult = new AcknowledgedSummaryClientBulkWriteResult(
+ insertedCount, upsertedCount, matchedCount, modifiedCount, deletedCount);
+ return verboseResultsSetting
+ ? new AcknowledgedVerboseClientBulkWriteResult(summaryResult, insertResults, updateResults, deleteResults)
+ : summaryResult;
+ } else {
+ return UnacknowledgedClientBulkWriteResult.INSTANCE;
+ }
+ } else if (batchResultsHaveResponses) {
+ AcknowledgedSummaryClientBulkWriteResult partialSummaryResult = batchResultsHaveInfoAboutSuccessfulIndividualOperations
+ ? new AcknowledgedSummaryClientBulkWriteResult(insertedCount, upsertedCount, matchedCount, modifiedCount, deletedCount)
+ : null;
+ throw new ClientBulkWriteException(
+ topLevelError,
+ writeConcernErrors,
+ writeErrors,
+ verboseResultsSetting && partialSummaryResult != null
+ ? new AcknowledgedVerboseClientBulkWriteResult(partialSummaryResult, insertResults, updateResults, deleteResults)
+ : partialSummaryResult,
+ assertNotNull(serverAddress));
+ } else {
+ throw assertNotNull(topLevelError);
+ }
+ }
+
+ void onNewServerAddress(final ServerAddress serverAddress) {
+ this.serverAddress = serverAddress;
+ }
+
+ @Nullable
+ Integer onBulkWriteCommandOkResponseOrNoResponse(
+ final int batchStartModelIndex,
+ @Nullable
+ final ExhaustiveClientBulkWriteCommandOkResponse response,
+ final BatchEncoder.EncodedBatchInfo encodedBatchInfo) {
+ return onBulkWriteCommandOkResponseOrNoResponse(batchStartModelIndex, response, null, encodedBatchInfo);
+ }
+
+ /**
+ * @return See {@link #executeBatch(int, WriteConcern, WriteBinding, ResultAccumulator)}.
+ */
+ @Nullable
+ Integer onBulkWriteCommandOkResponseWithWriteConcernError(
+ final int batchStartModelIndex,
+ final MongoWriteConcernWithResponseException exception,
+ final BatchEncoder.EncodedBatchInfo encodedBatchInfo) {
+ MongoWriteConcernException writeConcernException = (MongoWriteConcernException) exception.getCause();
+ onNewServerAddress(writeConcernException.getServerAddress());
+ ExhaustiveClientBulkWriteCommandOkResponse response = (ExhaustiveClientBulkWriteCommandOkResponse) exception.getResponse();
+ return onBulkWriteCommandOkResponseOrNoResponse(batchStartModelIndex, response, writeConcernException, encodedBatchInfo);
+ }
+
+ /**
+ * @return See {@link #executeBatch(int, WriteConcern, WriteBinding, ResultAccumulator)}.
+ */
+ @Nullable
+ private Integer onBulkWriteCommandOkResponseOrNoResponse(
+ final int batchStartModelIndex,
+ @Nullable
+ final ExhaustiveClientBulkWriteCommandOkResponse response,
+ @Nullable
+ final MongoWriteConcernException writeConcernException,
+ final BatchEncoder.EncodedBatchInfo encodedBatchInfo) {
+ BatchResult batchResult = response == null
+ ? BatchResult.noResponse(batchStartModelIndex, encodedBatchInfo)
+ : BatchResult.okResponse(batchStartModelIndex, encodedBatchInfo, response, writeConcernException);
+ batchResults.add(batchResult);
+ int potentialNextBatchStartModelIndex = batchStartModelIndex + batchResult.getBatchModelsCount();
+ return (response == null || response.operationMayContinue(options))
+ ? potentialNextBatchStartModelIndex == models.size() ? null : potentialNextBatchStartModelIndex
+ : null;
+ }
+
+ void onBulkWriteCommandErrorResponse(final MongoCommandException exception) {
+ onNewServerAddress(exception.getServerAddress());
+ }
+
+ void onBulkWriteCommandErrorWithoutResponse(final MongoException exception) {
+ Exceptions.serverAddressFromException(exception).ifPresent(this::onNewServerAddress);
+ }
+ }
+
+ public static final class ClientBulkWriteCommand {
+ private final BsonDocument commandDocument;
+ private final OpsAndNsInfo opsAndNsInfo;
+
+ ClientBulkWriteCommand(
+ final BsonDocument commandDocument,
+ final OpsAndNsInfo opsAndNsInfo) {
+ this.commandDocument = commandDocument;
+ this.opsAndNsInfo = opsAndNsInfo;
+ }
+
+ BsonDocument getCommandDocument() {
+ return commandDocument;
+ }
+
+ OpsAndNsInfo getOpsAndNsInfo() {
+ return opsAndNsInfo;
+ }
+
+ public static final class OpsAndNsInfo extends DualMessageSequences {
+ private final boolean effectiveRetryWrites;
+ private final List extends ClientNamespacedWriteModel> models;
+ private final BatchEncoder batchEncoder;
+ private final ConcreteClientBulkWriteOptions options;
+ private final Supplier doIfCommandIsRetryableAndAdvanceGetTxnNumber;
+
+ @VisibleForTesting(otherwise = PACKAGE)
+ public OpsAndNsInfo(
+ final boolean effectiveRetryWrites,
+ final List extends ClientNamespacedWriteModel> models,
+ final BatchEncoder batchEncoder,
+ final ConcreteClientBulkWriteOptions options,
+ final Supplier doIfCommandIsRetryableAndAdvanceGetTxnNumber) {
+ super("ops", new OpsFieldNameValidator(models), "nsInfo", NoOpFieldNameValidator.INSTANCE);
+ this.effectiveRetryWrites = effectiveRetryWrites;
+ this.models = models;
+ this.batchEncoder = batchEncoder;
+ this.options = options;
+ this.doIfCommandIsRetryableAndAdvanceGetTxnNumber = doIfCommandIsRetryableAndAdvanceGetTxnNumber;
+ }
+
+ @Override
+ public EncodeDocumentsResult encodeDocuments(final WritersProviderAndLimitsChecker writersProviderAndLimitsChecker) {
+ // We must call `batchEncoder.reset` lazily, that is here, and not eagerly before a command retry attempt,
+ // because a retry attempt may fail before encoding,
+ // in which case we need the information gathered by `batchEncoder` at a previous attempt.
+ batchEncoder.reset();
+ LinkedHashMap indexedNamespaces = new LinkedHashMap<>();
+ WritersProviderAndLimitsChecker.WriteResult writeResult = OK_LIMIT_NOT_REACHED;
+ boolean commandIsRetryable = effectiveRetryWrites;
+ int maxModelIndexInBatch = -1;
+ for (int modelIndexInBatch = 0; modelIndexInBatch < models.size() && writeResult == OK_LIMIT_NOT_REACHED; modelIndexInBatch++) {
+ AbstractClientNamespacedWriteModel namespacedModel = getNamespacedModel(models, modelIndexInBatch);
+ MongoNamespace namespace = namespacedModel.getNamespace();
+ int indexedNamespacesSizeBeforeCompute = indexedNamespaces.size();
+ int namespaceIndexInBatch = indexedNamespaces.computeIfAbsent(namespace, k -> indexedNamespacesSizeBeforeCompute);
+ boolean writeNewNamespace = indexedNamespaces.size() != indexedNamespacesSizeBeforeCompute;
+ int finalModelIndexInBatch = modelIndexInBatch;
+ writeResult = writersProviderAndLimitsChecker.tryWrite((opsWriter, nsInfoWriter) -> {
+ batchEncoder.encodeWriteModel(opsWriter, namespacedModel.getModel(), finalModelIndexInBatch, namespaceIndexInBatch);
+ if (writeNewNamespace) {
+ nsInfoWriter.writeStartDocument();
+ nsInfoWriter.writeString("ns", namespace.getFullName());
+ nsInfoWriter.writeEndDocument();
+ }
+ return finalModelIndexInBatch + 1;
+ });
+ if (writeResult == FAIL_LIMIT_EXCEEDED) {
+ batchEncoder.reset(finalModelIndexInBatch);
+ } else {
+ maxModelIndexInBatch = finalModelIndexInBatch;
+ if (commandIsRetryable && doesNotSupportRetries(namespacedModel)) {
+ commandIsRetryable = false;
+ logWriteModelDoesNotSupportRetries();
+ }
+ }
+ }
+ return new EncodeDocumentsResult(
+ // we will execute more batches, so we must request a response to maintain the order of individual write operations
+ options.isOrdered() && maxModelIndexInBatch < models.size() - 1,
+ commandIsRetryable
+ ? singletonList(new BsonElement("txnNumber", new BsonInt64(doIfCommandIsRetryableAndAdvanceGetTxnNumber.get())))
+ : emptyList());
+ }
+
+ private static boolean doesNotSupportRetries(final AbstractClientNamespacedWriteModel model) {
+ return model instanceof ConcreteClientNamespacedUpdateManyModel || model instanceof ConcreteClientNamespacedDeleteManyModel;
+ }
+
+ /**
+ * The server supports only the {@code update} individual write operation in the {@code ops} array field, while the driver supports
+ * {@link ClientNamespacedUpdateOneModel}, {@link ClientNamespacedUpdateOneModel}, {@link ClientNamespacedReplaceOneModel}.
+ * The difference between updating and replacing is only in the document specified via the {@code updateMods} field:
+ *
+ * - if the name of the first field starts with {@code '$'}, then the document is interpreted as specifying update operators;
+ * - if the name of the first field does not start with {@code '$'}, then the document is interpreted as a replacement.
+ *
+ *
+ * @see
+ * Update vs. replace document validation
+ */
+ private static final class OpsFieldNameValidator implements FieldNameValidator {
+ private static final Set OPERATION_DISCRIMINATOR_FIELD_NAMES = Stream.of("insert", "update", "delete").collect(toSet());
+
+ private final List extends ClientNamespacedWriteModel> models;
+ private final ReplacingUpdateModsFieldValidator replacingValidator;
+ private final UpdatingUpdateModsFieldValidator updatingValidator;
+ private int currentIndividualOperationIndex;
+
+ OpsFieldNameValidator(final List extends ClientNamespacedWriteModel> models) {
+ this.models = models;
+ replacingValidator = new ReplacingUpdateModsFieldValidator();
+ updatingValidator = new UpdatingUpdateModsFieldValidator();
+ currentIndividualOperationIndex = -1;
+ }
+
+ @Override
+ public boolean validate(final String fieldName) {
+ if (OPERATION_DISCRIMINATOR_FIELD_NAMES.contains(fieldName)) {
+ currentIndividualOperationIndex++;
+ }
+ return true;
+ }
+
+ @Override
+ public FieldNameValidator getValidatorForField(final String fieldName) {
+ if (fieldName.equals("updateMods")) {
+ return currentIndividualOperationIsReplace() ? replacingValidator.reset() : updatingValidator.reset();
+ }
+ return NoOpFieldNameValidator.INSTANCE;
+ }
+
+ private boolean currentIndividualOperationIsReplace() {
+ return getNamespacedModel(models, currentIndividualOperationIndex) instanceof ConcreteClientNamespacedReplaceOneModel;
+ }
+
+ private static final class ReplacingUpdateModsFieldValidator implements FieldNameValidator {
+ private boolean firstFieldSinceLastReset;
+
+ ReplacingUpdateModsFieldValidator() {
+ firstFieldSinceLastReset = true;
+ }
+
+ @Override
+ public boolean validate(final String fieldName) {
+ if (firstFieldSinceLastReset) {
+ // we must validate only the first field, and leave the rest up to the server
+ firstFieldSinceLastReset = false;
+ return ReplacingDocumentFieldNameValidator.INSTANCE.validate(fieldName);
+ }
+ return true;
+ }
+
+ @Override
+ public String getValidationErrorMessage(final String fieldName) {
+ return ReplacingDocumentFieldNameValidator.INSTANCE.getValidationErrorMessage(fieldName);
+ }
+
+ @Override
+ public FieldNameValidator getValidatorForField(final String fieldName) {
+ return NoOpFieldNameValidator.INSTANCE;
+ }
+
+ ReplacingUpdateModsFieldValidator reset() {
+ firstFieldSinceLastReset = true;
+ return this;
+ }
+ }
+
+ private static final class UpdatingUpdateModsFieldValidator implements FieldNameValidator {
+ private final UpdateFieldNameValidator delegate;
+ private boolean firstFieldSinceLastReset;
+
+ UpdatingUpdateModsFieldValidator() {
+ delegate = new UpdateFieldNameValidator();
+ firstFieldSinceLastReset = true;
+ }
+
+ @Override
+ public boolean validate(final String fieldName) {
+ if (firstFieldSinceLastReset) {
+ // we must validate only the first field, and leave the rest up to the server
+ firstFieldSinceLastReset = false;
+ return delegate.validate(fieldName);
+ }
+ return true;
+ }
+
+ @Override
+ public String getValidationErrorMessage(final String fieldName) {
+ return delegate.getValidationErrorMessage(fieldName);
+ }
+
+ @Override
+ public FieldNameValidator getValidatorForField(final String fieldName) {
+ return NoOpFieldNameValidator.INSTANCE;
+ }
+
+ @Override
+ public void start() {
+ delegate.start();
+ }
+
+ @Override
+ public void end() {
+ delegate.end();
+ }
+
+ UpdatingUpdateModsFieldValidator reset() {
+ delegate.reset();
+ firstFieldSinceLastReset = true;
+ return this;
+ }
+ }
+ }
+ }
+ }
+
+ static final class BatchResult {
+ private final int batchStartModelIndex;
+ private final BatchEncoder.EncodedBatchInfo encodedBatchInfo;
+ @Nullable
+ private final ExhaustiveClientBulkWriteCommandOkResponse response;
+ @Nullable
+ private final MongoWriteConcernException writeConcernException;
+
+ static BatchResult okResponse(
+ final int batchStartModelIndex,
+ final BatchEncoder.EncodedBatchInfo encodedBatchInfo,
+ final ExhaustiveClientBulkWriteCommandOkResponse response,
+ @Nullable final MongoWriteConcernException writeConcernException) {
+ return new BatchResult(batchStartModelIndex, encodedBatchInfo, assertNotNull(response), writeConcernException);
+ }
+
+ static BatchResult noResponse(final int batchStartModelIndex, final BatchEncoder.EncodedBatchInfo encodedBatchInfo) {
+ return new BatchResult(batchStartModelIndex, encodedBatchInfo, null, null);
+ }
+
+ private BatchResult(
+ final int batchStartModelIndex,
+ final BatchEncoder.EncodedBatchInfo encodedBatchInfo,
+ @Nullable final ExhaustiveClientBulkWriteCommandOkResponse response,
+ @Nullable final MongoWriteConcernException writeConcernException) {
+ this.batchStartModelIndex = batchStartModelIndex;
+ this.encodedBatchInfo = encodedBatchInfo;
+ this.response = response;
+ this.writeConcernException = writeConcernException;
+ }
+
+ int getBatchStartModelIndex() {
+ return batchStartModelIndex;
+ }
+
+ /**
+ * @see BatchEncoder.EncodedBatchInfo#getModelsCount()
+ */
+ int getBatchModelsCount() {
+ return encodedBatchInfo.getModelsCount();
+ }
+
+ boolean hasResponse() {
+ return response != null;
+ }
+
+ ExhaustiveClientBulkWriteCommandOkResponse getResponse() {
+ return assertNotNull(response);
+ }
+
+ @Nullable
+ MongoWriteConcernException getWriteConcernException() {
+ assertTrue(hasResponse());
+ return writeConcernException;
+ }
+
+ /**
+ * @see BatchEncoder.EncodedBatchInfo#getInsertModelDocumentIds()
+ */
+ Map getInsertModelDocumentIds() {
+ assertTrue(hasResponse());
+ return encodedBatchInfo.getInsertModelDocumentIds();
+ }
+ }
+
+ /**
+ * Exactly one instance must be used per {@linkplain #executeBatch(int, WriteConcern, WriteBinding, ResultAccumulator) batch}.
+ */
+ @VisibleForTesting(otherwise = PRIVATE)
+ public final class BatchEncoder {
+ private EncodedBatchInfo encodedBatchInfo;
+
+ @VisibleForTesting(otherwise = PACKAGE)
+ public BatchEncoder() {
+ encodedBatchInfo = new EncodedBatchInfo();
+ }
+
+ /**
+ * Must be called at most once.
+ * Must not be called before calling
+ * {@link #encodeWriteModel(BsonBinaryWriter, ClientWriteModel, int, int)} at least once.
+ * Renders {@code this} unusable.
+ */
+ EncodedBatchInfo intoEncodedBatchInfo() {
+ EncodedBatchInfo result = assertNotNull(encodedBatchInfo);
+ encodedBatchInfo = null;
+ assertTrue(result.getModelsCount() > 0);
+ return result;
+ }
+
+ void reset() {
+ // we must not reset anything but `modelsCount`
+ assertNotNull(encodedBatchInfo).modelsCount = 0;
+ }
+
+ void reset(final int modelIndexInBatch) {
+ assertNotNull(encodedBatchInfo).modelsCount -= 1;
+ encodedBatchInfo.insertModelDocumentIds.remove(modelIndexInBatch);
+ }
+
+ void encodeWriteModel(
+ final BsonBinaryWriter writer,
+ final ClientWriteModel model,
+ final int modelIndexInBatch,
+ final int namespaceIndexInBatch) {
+ assertNotNull(encodedBatchInfo).modelsCount++;
+ writer.writeStartDocument();
+ if (model instanceof ConcreteClientInsertOneModel) {
+ writer.writeInt32("insert", namespaceIndexInBatch);
+ encodeWriteModelInternals(writer, (ConcreteClientInsertOneModel) model, modelIndexInBatch);
+ } else if (model instanceof ConcreteClientUpdateOneModel) {
+ writer.writeInt32("update", namespaceIndexInBatch);
+ writer.writeBoolean("multi", false);
+ encodeWriteModelInternals(writer, (ConcreteClientUpdateOneModel) model);
+ } else if (model instanceof ConcreteClientUpdateManyModel) {
+ writer.writeInt32("update", namespaceIndexInBatch);
+ writer.writeBoolean("multi", true);
+ encodeWriteModelInternals(writer, (ConcreteClientUpdateManyModel) model);
+ } else if (model instanceof ConcreteClientReplaceOneModel) {
+ writer.writeInt32("update", namespaceIndexInBatch);
+ encodeWriteModelInternals(writer, (ConcreteClientReplaceOneModel) model);
+ } else if (model instanceof ConcreteClientDeleteOneModel) {
+ writer.writeInt32("delete", namespaceIndexInBatch);
+ writer.writeBoolean("multi", false);
+ encodeWriteModelInternals(writer, (ConcreteClientDeleteOneModel) model);
+ } else if (model instanceof ConcreteClientDeleteManyModel) {
+ writer.writeInt32("delete", namespaceIndexInBatch);
+ writer.writeBoolean("multi", true);
+ encodeWriteModelInternals(writer, (ConcreteClientDeleteManyModel) model);
+ } else {
+ throw fail(model.getClass().toString());
+ }
+ writer.writeEndDocument();
+ }
+
+ private void encodeWriteModelInternals(
+ final BsonBinaryWriter writer,
+ final ConcreteClientInsertOneModel model,
+ final int modelIndexInBatch) {
+ writer.writeName("document");
+ Object document = model.getDocument();
+ assertNotNull(encodedBatchInfo).insertModelDocumentIds.compute(modelIndexInBatch, (k, knownModelDocumentId) -> {
+ IdHoldingBsonWriter documentIdHoldingBsonWriter = new IdHoldingBsonWriter(
+ writer,
+ // Reuse `knownModelDocumentId` if it may have been generated by `IdHoldingBsonWriter` in a previous attempt.
+ // If its type is not `BsonObjectId`, which happens only if `_id` was specified by the application,
+ // we know it could not have been generated.
+ knownModelDocumentId instanceof BsonObjectId ? knownModelDocumentId.asObjectId() : null);
+ encodeUsingRegistry(documentIdHoldingBsonWriter, document, COLLECTIBLE_DOCUMENT_ENCODER_CONTEXT);
+ return documentIdHoldingBsonWriter.getId();
+ });
+ }
+
+ private void encodeWriteModelInternals(final BsonWriter writer, final AbstractClientUpdateModel> model) {
+ writer.writeName("filter");
+ encodeUsingRegistry(writer, model.getFilter());
+ model.getUpdate().ifPresent(value -> {
+ writer.writeName("updateMods");
+ encodeUsingRegistry(writer, value);
+ });
+ model.getUpdatePipeline().ifPresent(value -> {
+ writer.writeStartArray("updateMods");
+ value.forEach(pipelineStage -> encodeUsingRegistry(writer, pipelineStage));
+ writer.writeEndArray();
+ });
+ AbstractClientUpdateOptions options = model.getOptions();
+ options.getArrayFilters().ifPresent(value -> {
+ writer.writeStartArray("arrayFilters");
+ value.forEach(filter -> encodeUsingRegistry(writer, filter));
+ writer.writeEndArray();
+ });
+ options.getCollation().ifPresent(value -> {
+ writer.writeName("collation");
+ encodeUsingRegistry(writer, value.asDocument());
+ });
+ options.getHint().ifPresent(hint -> {
+ writer.writeName("hint");
+ encodeUsingRegistry(writer, hint);
+ });
+ options.getHintString().ifPresent(value -> writer.writeString("hint", value));
+ options.isUpsert().ifPresent(value -> writer.writeBoolean("upsert", value));
+ }
+
+ private void encodeWriteModelInternals(final BsonBinaryWriter writer, final ConcreteClientReplaceOneModel model) {
+ writer.writeBoolean("multi", false);
+ writer.writeName("filter");
+ encodeUsingRegistry(writer, model.getFilter());
+ writer.writeName("updateMods");
+ encodeUsingRegistry(writer, model.getReplacement(), COLLECTIBLE_DOCUMENT_ENCODER_CONTEXT);
+ ConcreteClientReplaceOneOptions options = model.getOptions();
+ options.getCollation().ifPresent(value -> {
+ writer.writeName("collation");
+ encodeUsingRegistry(writer, value.asDocument());
+ });
+ options.getHint().ifPresent(value -> {
+ writer.writeName("hint");
+ encodeUsingRegistry(writer, value);
+ });
+ options.getHintString().ifPresent(value -> writer.writeString("hint", value));
+ options.isUpsert().ifPresent(value -> writer.writeBoolean("upsert", value));
+ }
+
+ private void encodeWriteModelInternals(final BsonWriter writer, final AbstractClientDeleteModel> model) {
+ writer.writeName("filter");
+ encodeUsingRegistry(writer, model.getFilter());
+ AbstractClientDeleteOptions options = model.getOptions();
+ options.getCollation().ifPresent(value -> {
+ writer.writeName("collation");
+ encodeUsingRegistry(writer, value.asDocument());
+ });
+ options.getHint().ifPresent(value -> {
+ writer.writeName("hint");
+ encodeUsingRegistry(writer, value);
+ });
+ options.getHintString().ifPresent(value -> writer.writeString("hint", value));
+ }
+
+ final class EncodedBatchInfo {
+ private final HashMap insertModelDocumentIds;
+ private int modelsCount;
+
+ private EncodedBatchInfo() {
+ insertModelDocumentIds = new HashMap<>();
+ modelsCount = 0;
+ }
+
+ /**
+ * The key of each entry is the index of a model in the
+ * {@linkplain #executeBatch(int, WriteConcern, WriteBinding, ResultAccumulator) batch},
+ * the value is either the "_id" field value from {@linkplain ConcreteClientInsertOneModel#getDocument()},
+ * or the value we generated for this field if the field is absent.
+ */
+ Map getInsertModelDocumentIds() {
+ return insertModelDocumentIds;
+ }
+
+ int getModelsCount() {
+ return modelsCount;
+ }
+ }
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
index 4c428131853..2861bcf9ad5 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
@@ -25,6 +25,7 @@
import com.mongodb.MongoSecurityException;
import com.mongodb.MongoServerException;
import com.mongodb.MongoSocketException;
+import com.mongodb.WriteConcern;
import com.mongodb.assertions.Assertions;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerDescription;
@@ -33,20 +34,40 @@
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.operation.OperationHelper.ResourceSupplierInternalException;
import com.mongodb.internal.operation.retry.AttachmentKeys;
+import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import java.util.List;
+import java.util.Optional;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import static com.mongodb.assertions.Assertions.assertFalse;
+import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static java.lang.String.format;
import static java.util.Arrays.asList;
@SuppressWarnings("overloads")
final class CommandOperationHelper {
+ static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext)
+ throws MongoClientException {
+ boolean activeTransaction = sessionContext.hasActiveTransaction();
+ WriteConcern effectiveWriteConcern = activeTransaction
+ ? WriteConcern.ACKNOWLEDGED
+ : writeConcernSetting;
+ if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !activeTransaction && !effectiveWriteConcern.isAcknowledged()) {
+ throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
+ }
+ return effectiveWriteConcern;
+ }
+
+ static Optional commandWriteConcern(final WriteConcern effectiveWriteConcern, final SessionContext sessionContext) {
+ return effectiveWriteConcern.isServerDefault() || sessionContext.hasActiveTransaction()
+ ? Optional.empty()
+ : Optional.of(effectiveWriteConcern);
+ }
interface CommandCreator {
BsonDocument create(
@@ -153,7 +174,26 @@ static boolean shouldAttemptToRetryRead(final RetryState retryState, final Throw
return decision;
}
- static boolean shouldAttemptToRetryWrite(final RetryState retryState, final Throwable attemptFailure) {
+ static boolean loggingShouldAttemptToRetryWriteAndAddRetryableLabel(final RetryState retryState, final Throwable attemptFailure) {
+ Throwable attemptFailureNotToBeRetried = getAttemptFailureNotToRetryOrAddRetryableLabel(retryState, attemptFailure);
+ boolean decision = attemptFailureNotToBeRetried == null;
+ if (!decision && retryState.attachment(AttachmentKeys.retryableCommandFlag()).orElse(false)) {
+ logUnableToRetry(
+ retryState.attachment(AttachmentKeys.commandDescriptionSupplier()).orElse(null),
+ assertNotNull(attemptFailureNotToBeRetried));
+ }
+ return decision;
+ }
+
+ static boolean shouldAttemptToRetryWriteAndAddRetryableLabel(final RetryState retryState, final Throwable attemptFailure) {
+ return getAttemptFailureNotToRetryOrAddRetryableLabel(retryState, attemptFailure) != null;
+ }
+
+ /**
+ * @return {@code null} if the decision is {@code true}. Otherwise, returns the {@link Throwable} that must not be retried.
+ */
+ @Nullable
+ private static Throwable getAttemptFailureNotToRetryOrAddRetryableLabel(final RetryState retryState, final Throwable attemptFailure) {
Throwable failure = attemptFailure instanceof ResourceSupplierInternalException ? attemptFailure.getCause() : attemptFailure;
boolean decision = false;
MongoException exceptionRetryableRegardlessOfCommand = null;
@@ -170,11 +210,9 @@ static boolean shouldAttemptToRetryWrite(final RetryState retryState, final Thro
} else if (decideRetryableAndAddRetryableWriteErrorLabel(failure, retryState.attachment(AttachmentKeys.maxWireVersion())
.orElse(null))) {
decision = true;
- } else {
- logUnableToRetry(retryState.attachment(AttachmentKeys.commandDescriptionSupplier()).orElse(null), failure);
}
}
- return decision;
+ return decision ? null : assertNotNull(failure);
}
static boolean isRetryWritesEnabled(@Nullable final BsonDocument command) {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java
index a32ce6d5153..06d392bceb2 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java
@@ -16,7 +16,6 @@
package com.mongodb.internal.operation;
-import com.mongodb.MongoClientException;
import com.mongodb.MongoException;
import com.mongodb.MongoNamespace;
import com.mongodb.WriteConcern;
@@ -63,8 +62,10 @@
import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection;
import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableWriteErrorLabel;
import static com.mongodb.internal.operation.CommandOperationHelper.logRetryExecute;
+import static com.mongodb.internal.operation.CommandOperationHelper.loggingShouldAttemptToRetryWriteAndAddRetryableLabel;
import static com.mongodb.internal.operation.CommandOperationHelper.onRetryableWriteAttemptFailure;
import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException;
+import static com.mongodb.internal.operation.CommandOperationHelper.validateAndGetEffectiveWriteConcern;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
import static com.mongodb.internal.operation.OperationHelper.validateWriteRequests;
@@ -164,7 +165,7 @@ private boolean shouldAttemptToRetryWrite(final RetryState retryState, final Thr
if (bulkWriteTracker.lastAttempt()) {
return false;
}
- boolean decision = CommandOperationHelper.shouldAttemptToRetryWrite(retryState, attemptFailure);
+ boolean decision = loggingShouldAttemptToRetryWriteAndAddRetryableLabel(retryState, attemptFailure);
if (decision) {
/* The attempt counter maintained by `RetryState` is updated after (in the happens-before order) testing a retry predicate,
* and only if the predicate completes normally. Here we maintain attempt counters manually, and we emulate the
@@ -274,7 +275,7 @@ private BulkWriteResult executeBulkWriteBatch(
if (currentBulkWriteTracker.lastAttempt()) {
addRetryableWriteErrorLabel(writeConcernBasedError, maxWireVersion);
addErrorLabelsToWriteConcern(result.getDocument("writeConcernError"), writeConcernBasedError.getErrorLabels());
- } else if (CommandOperationHelper.shouldAttemptToRetryWrite(retryState, writeConcernBasedError)) {
+ } else if (loggingShouldAttemptToRetryWriteAndAddRetryableLabel(retryState, writeConcernBasedError)) {
throw new MongoWriteConcernWithResponseException(writeConcernBasedError, result);
}
}
@@ -328,7 +329,7 @@ private void executeBulkWriteBatchAsync(
addRetryableWriteErrorLabel(writeConcernBasedError, maxWireVersion);
addErrorLabelsToWriteConcern(result.getDocument("writeConcernError"),
writeConcernBasedError.getErrorLabels());
- } else if (CommandOperationHelper.shouldAttemptToRetryWrite(retryState, writeConcernBasedError)) {
+ } else if (loggingShouldAttemptToRetryWriteAndAddRetryableLabel(retryState, writeConcernBasedError)) {
iterationCallback.onResult(null,
new MongoWriteConcernWithResponseException(writeConcernBasedError, result));
return;
@@ -420,8 +421,7 @@ private BsonDocument executeCommand(
final Connection connection,
final BulkWriteBatch batch) {
return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
- operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
- batch.getPayload(), batch.getFieldNameValidator());
+ operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload());
}
private void executeCommandAsync(
@@ -431,26 +431,7 @@ private void executeCommandAsync(
final BulkWriteBatch batch,
final SingleResultCallback callback) {
connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(),
- operationContext, shouldExpectResponse(batch, effectiveWriteConcern),
- batch.getPayload(), batch.getFieldNameValidator(), callback);
- }
-
- private static WriteConcern validateAndGetEffectiveWriteConcern(final WriteConcern writeConcernSetting, final SessionContext sessionContext)
- throws MongoClientException {
- boolean activeTransaction = sessionContext.hasActiveTransaction();
- WriteConcern effectiveWriteConcern = activeTransaction
- ? WriteConcern.ACKNOWLEDGED
- : writeConcernSetting;
- if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !activeTransaction && !effectiveWriteConcern.isAcknowledged()) {
- throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
- }
- return effectiveWriteConcern;
- }
-
- static Optional commandWriteConcern(final WriteConcern effectiveWriteConcern, final SessionContext sessionContext) {
- return effectiveWriteConcern.isServerDefault() || sessionContext.hasActiveTransaction()
- ? Optional.empty()
- : Optional.of(effectiveWriteConcern);
+ operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), callback);
}
private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java
index 04318635a06..b7a4997e639 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java
@@ -16,7 +16,9 @@
package com.mongodb.internal.operation;
+import com.mongodb.ClientBulkWriteException;
import com.mongodb.MongoClientException;
+import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.Collation;
@@ -47,7 +49,10 @@
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsLessThanVersionFourDotTwo;
import static java.lang.String.format;
-final class OperationHelper {
+/**
+ * This class is not part of the public API and may be removed or changed at any time.
+ */
+public final class OperationHelper {
public static final Logger LOGGER = Loggers.getLogger("operation");
static void validateCollationAndWriteConcern(@Nullable final Collation collation, final WriteConcern writeConcern) {
@@ -202,6 +207,21 @@ static void setNonTailableCursorMaxTimeSupplier(final TimeoutMode timeoutMode, f
}
}
+ /**
+ * Returns the {@link MongoException} that carries or should carry
+ * the {@linkplain MongoException#getCode() error code} and {@linkplain MongoException#getErrorLabels() error labels}.
+ * This method is needed because exceptions like {@link ClientBulkWriteException} do not carry that data themselves.
+ */
+ public static MongoException unwrap(final MongoException exception) {
+ MongoException result = exception;
+ if (exception instanceof ClientBulkWriteException) {
+ MongoException topLevelError = ((ClientBulkWriteException) exception).getCause();
+ result = topLevelError == null ? exception : topLevelError;
+ }
+ return result;
+ }
+
+
/**
* This internal exception is used to
*
diff --git a/driver-core/src/main/com/mongodb/internal/operation/Operations.java b/driver-core/src/main/com/mongodb/internal/operation/Operations.java
index ecdd215ba91..88af67a1204 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/Operations.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/Operations.java
@@ -54,6 +54,8 @@
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.ValidationOptions;
import com.mongodb.client.model.WriteModel;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.bulk.DeleteRequest;
@@ -727,6 +729,12 @@ ChangeStreamOperation changeStream(final FullDocument fullDoc
.retryReads(retryReads);
}
+ ClientBulkWriteOperation clientBulkWriteOperation(
+ final List extends ClientNamespacedWriteModel> clientWriteModels,
+ @Nullable final ClientBulkWriteOptions options) {
+ return new ClientBulkWriteOperation(clientWriteModels, options, writeConcern, retryWrites, codecRegistry);
+ }
+
private Codec getCodec() {
return codecRegistry.get(documentClass);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java
index 62da7cde2c8..6d013df59ba 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java
@@ -303,7 +303,7 @@ static T createReadCommandAndExecute(
static Supplier decorateWriteWithRetries(final RetryState retryState,
final OperationContext operationContext, final Supplier writeFunction) {
return new RetryingSyncSupplier<>(retryState, onRetryableWriteAttemptFailure(operationContext),
- CommandOperationHelper::shouldAttemptToRetryWrite, () -> {
+ CommandOperationHelper::loggingShouldAttemptToRetryWriteAndAddRetryableLabel, () -> {
logRetryExecute(retryState, operationContext);
return writeFunction.get();
});
@@ -334,8 +334,8 @@ static CommandReadTransformer> singleBatchCurso
connection.getDescription().getServerAddress());
}
- static BatchCursor cursorDocumentToBatchCursor(final TimeoutMode timeoutMode, final BsonDocument cursorDocument,
- final int batchSize, final Decoder decoder, final BsonValue comment, final ConnectionSource source,
+ static CommandBatchCursor cursorDocumentToBatchCursor(final TimeoutMode timeoutMode, final BsonDocument cursorDocument,
+ final int batchSize, final Decoder decoder, @Nullable final BsonValue comment, final ConnectionSource source,
final Connection connection) {
return new CommandBatchCursor<>(timeoutMode, cursorDocument, batchSize, 0, decoder, comment, source, connection);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java
index 952a35fe7fe..72f738ec971 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java
@@ -44,8 +44,11 @@
import com.mongodb.client.model.SearchIndexModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
+import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
+import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
+import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.client.model.FindOptions;
@@ -359,4 +362,10 @@ public ReadOperation> changeStream(final FullDocu
return operations.changeStream(fullDocument, fullDocumentBeforeChange, pipeline, decoder, changeStreamLevel, batchSize,
collation, comment, resumeToken, startAtOperationTime, startAfter, showExpandedEvents);
}
+
+ public WriteOperation clientBulkWriteOperation(
+ final List extends ClientNamespacedWriteModel> clientWriteModels,
+ @Nullable final ClientBulkWriteOptions options) {
+ return operations.clientBulkWriteOperation(clientWriteModels, options);
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/session/SessionContext.java b/driver-core/src/main/com/mongodb/internal/session/SessionContext.java
index 6c55c526d45..4a8902799ec 100644
--- a/driver-core/src/main/com/mongodb/internal/session/SessionContext.java
+++ b/driver-core/src/main/com/mongodb/internal/session/SessionContext.java
@@ -48,7 +48,7 @@ public interface SessionContext {
/**
* Advance the transaction number.
*
- * @return the next transaction number for the session
+ * @return the next non-negative transaction number for the session
*/
long advanceTransactionNumber();
diff --git a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java
index d0b95970511..811065d13a6 100644
--- a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java
+++ b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java
@@ -28,6 +28,7 @@
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
@@ -234,7 +235,7 @@ public int hashCode() {
public String toString() {
String remainingMs = isInfinite()
? "infinite"
- : "" + TimeUnit.MILLISECONDS.convert(currentNanos() - assertNotNull(nanos), NANOSECONDS);
+ : "" + remaining(MILLISECONDS);
return "TimePoint{"
+ "nanos=" + nanos
+ ", remainingMs=" + remainingMs
diff --git a/driver-core/src/main/com/mongodb/internal/validator/UpdateFieldNameValidator.java b/driver-core/src/main/com/mongodb/internal/validator/UpdateFieldNameValidator.java
index 40762bfb5fb..fc59b0cc312 100644
--- a/driver-core/src/main/com/mongodb/internal/validator/UpdateFieldNameValidator.java
+++ b/driver-core/src/main/com/mongodb/internal/validator/UpdateFieldNameValidator.java
@@ -48,7 +48,7 @@ public FieldNameValidator getValidatorForField(final String fieldName) {
@Override
public void start() {
- encounteredField = false;
+ reset();
}
@Override
@@ -57,4 +57,9 @@ public void end() {
throw new IllegalArgumentException("Invalid BSON document for an update. The document may not be empty.");
}
}
+
+ public UpdateFieldNameValidator reset() {
+ encounteredField = false;
+ return this;
+ }
}
diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java
index a889856f394..dde9682de8d 100644
--- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java
+++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java
@@ -125,7 +125,7 @@ public final class ClusterFixture {
private static final String MONGODB_OCSP_SHOULD_SUCCEED = "org.mongodb.test.ocsp.tls.should.succeed";
private static final String DEFAULT_DATABASE_NAME = "JavaDriverTest";
private static final int COMMAND_NOT_FOUND_ERROR_CODE = 59;
- public static final long TIMEOUT = 60L;
+ public static final long TIMEOUT = 120L;
public static final Duration TIMEOUT_DURATION = Duration.ofSeconds(TIMEOUT);
public static final TimeoutSettings TIMEOUT_SETTINGS = new TimeoutSettings(30_000, 10_000, 0, null, SECONDS.toMillis(5));
diff --git a/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java b/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java
index 1cc3904749d..0a96d5ab0cf 100644
--- a/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java
+++ b/driver-core/src/test/functional/com/mongodb/client/syncadapter/SyncConnection.java
@@ -19,8 +19,8 @@
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.Connection;
+import com.mongodb.internal.connection.MessageSequences;
import com.mongodb.internal.connection.OperationContext;
-import com.mongodb.internal.connection.SplittablePayload;
import org.bson.BsonDocument;
import org.bson.FieldNameValidator;
import org.bson.codecs.Decoder;
@@ -65,11 +65,10 @@ public T command(final String database, final BsonDocument command, final Fi
@Override
public T command(final String database, final BsonDocument command, final FieldNameValidator commandFieldNameValidator,
final ReadPreference readPreference, final Decoder commandResultDecoder,
- final OperationContext operationContext, final boolean responseExpected, final SplittablePayload payload,
- final FieldNameValidator payloadFieldNameValidator) {
+ final OperationContext operationContext, final boolean responseExpected, final MessageSequences sequences) {
SupplyingCallback callback = new SupplyingCallback<>();
wrapped.commandAsync(database, command, commandFieldNameValidator, readPreference, commandResultDecoder, operationContext,
- responseExpected, payload, payloadFieldNameValidator, callback);
+ responseExpected, sequences, callback);
return callback.get();
}
diff --git a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java
index adce165ee51..3e58712ca9c 100644
--- a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java
+++ b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java
@@ -357,9 +357,17 @@ public void replaceOne(final Bson filter, final Bson update, final boolean isUps
}
public void deleteOne(final Bson filter) {
+ delete(filter, false);
+ }
+
+ public void deleteMany(final Bson filter) {
+ delete(filter, true);
+ }
+
+ private void delete(final Bson filter, final boolean multi) {
new MixedBulkWriteOperation(namespace,
- singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry))),
- true, WriteConcern.ACKNOWLEDGED, false)
+ singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry)).multi(multi)),
+ true, WriteConcern.ACKNOWLEDGED, false)
.execute(getBinding());
}
diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java
index a272f8b0f67..88dc199ee29 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java
@@ -21,8 +21,10 @@
import com.mongodb.MongoQueryException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerCursor;
+import com.mongodb.async.FutureResultCallback;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
import com.mongodb.client.model.OperationTest;
import com.mongodb.internal.binding.AsyncConnectionSource;
import com.mongodb.internal.connection.AsyncConnection;
@@ -103,6 +105,69 @@ void cleanup() {
});
}
+ @Test
+ @DisplayName("should exhaust cursor with multiple batches")
+ void shouldExhaustCursorAsyncWithMultipleBatches() {
+ // given
+ BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
+ cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+ null, connectionSource, connection);
+
+ // when
+ FutureResultCallback>> futureCallback = new FutureResultCallback<>();
+ cursor.exhaust(futureCallback);
+
+ // then
+ List> resultBatches = futureCallback.get(5, TimeUnit.SECONDS);
+
+ assertTrue(cursor.isClosed(), "Expected cursor to be closed.");
+ assertEquals(4, resultBatches.size(), "Expected 4 batches for 10 documents with batch size of 3.");
+
+ int totalDocuments = resultBatches.stream().mapToInt(List::size).sum();
+ assertEquals(10, totalDocuments, "Expected a total of 10 documents.");
+ }
+
+ @Test
+ @DisplayName("should exhaust cursor with closed cursor")
+ void shouldExhaustCursorAsyncWithClosedCursor() {
+ // given
+ BsonDocument commandResult = executeFindCommand(0, 3);
+ cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+ null, connectionSource, connection);
+
+ cursor.close();
+
+ // when
+ FutureResultCallback>> futureCallback = new FutureResultCallback<>();
+ cursor.exhaust(futureCallback);
+
+ //then
+ IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, () -> {
+ futureCallback.get(5, TimeUnit.SECONDS);
+ }, "Expected an exception when operating on a closed cursor.");
+ assertEquals("Cursor has been closed", illegalStateException.getMessage());
+ }
+
+ @Test
+ @DisplayName("should exhaust cursor with empty cursor")
+ void shouldExhaustCursorAsyncWithEmptyCursor() {
+ // given
+ getCollectionHelper().deleteMany(Filters.empty());
+
+ BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch
+ cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+ null, connectionSource, connection);
+
+ // when
+ FutureResultCallback>> futureCallback = new FutureResultCallback<>();
+ cursor.exhaust(futureCallback);
+
+ // then
+ List> resultBatches = futureCallback.get(5, TimeUnit.SECONDS);
+ assertTrue(resultBatches.isEmpty(), "Expected no batches for an empty cursor.");
+ assertTrue(cursor.isClosed(), "Expected cursor to be closed.");
+ }
+
@Test
@DisplayName("server cursor should not be null")
void theServerCursorShouldNotBeNull() {
diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java
index 57caf3bdbfc..d9861c71659 100644
--- a/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java
+++ b/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java
@@ -22,6 +22,7 @@
import com.mongodb.ServerCursor;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.Filters;
import com.mongodb.client.model.OperationTest;
import com.mongodb.internal.binding.ConnectionSource;
import com.mongodb.internal.connection.Connection;
@@ -101,6 +102,55 @@ void cleanup() {
});
}
+ @Test
+ @DisplayName("should exhaust cursor with multiple batches")
+ void shouldExhaustCursorWithMultipleBatches() {
+ // given
+ BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
+ cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
+ null, connectionSource, connection);
+
+ // when
+ List