diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java index 71e6e3baa5e1..aa4250955692 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/BridgeInternal.java @@ -15,6 +15,7 @@ import com.azure.cosmos.implementation.MetadataDiagnosticsContext; import com.azure.cosmos.implementation.QueryMetrics; import com.azure.cosmos.implementation.ReplicationPolicy; +import com.azure.cosmos.implementation.RequestOptions; import com.azure.cosmos.implementation.RequestTimeline; import com.azure.cosmos.implementation.Resource; import com.azure.cosmos.implementation.ResourceResponse; @@ -607,4 +608,58 @@ public static Duration getRequestTimeoutFromDirectConnectionConfig(DirectConnect public static Duration getRequestTimeoutFromGatewayConnectionConfig(GatewayConnectionConfig gatewayConnectionConfig) { return gatewayConnectionConfig.getRequestTimeout(); } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static String getOperationValueForCosmosItemOperationType(CosmosItemOperationType cosmosItemOperationType) { + return cosmosItemOperationType.getOperationValue(); + } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static RequestOptions toRequestOptions(TransactionalBatchRequestOptions transactionalBatchRequestOptions) { + return transactionalBatchRequestOptions.toRequestOptions(); + } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static TransactionalBatchOperationResult createTransactionBatchResult( + String eTag, + double requestCharge, + ObjectNode resourceObject, + int statusCode, + Duration retryAfter, + int subStatusCode, + CosmosItemOperation cosmosItemOperation) { + + return new TransactionalBatchOperationResult( + eTag, + requestCharge, + resourceObject, + statusCode, + retryAfter, + subStatusCode, + cosmosItemOperation); + } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static TransactionalBatchResponse createTransactionBatchResponse( + int responseStatusCode, + int responseSubStatusCode, + String errorMessage, + Map responseHeaders, + CosmosDiagnostics cosmosDiagnostics) { + + return new TransactionalBatchResponse( + responseStatusCode, + responseSubStatusCode, + errorMessage, + responseHeaders, + cosmosDiagnostics); + } + + @Warning(value = INTERNAL_USE_ONLY_WARNING) + public static void addTransactionBatchResultInResponse( + TransactionalBatchResponse transactionalBatchResponse, + List transactionalBatchOperationResults) { + + transactionalBatchResponse.addAll(transactionalBatchOperationResults); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index 19c30c48d536..8d0a3a151c69 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -15,6 +15,7 @@ import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.implementation.batch.BatchExecutor; import com.azure.cosmos.implementation.query.QueryInfo; import com.azure.cosmos.models.CosmosConflictProperties; import com.azure.cosmos.models.CosmosContainerProperties; @@ -67,6 +68,7 @@ public class CosmosAsyncContainer { private final String queryItemsSpanName; private final String readAllConflictsSpanName; private final String queryConflictsSpanName; + private final String batchSpanName; private CosmosAsyncScripts scripts; CosmosAsyncContainer(String id, CosmosAsyncDatabase database) { @@ -87,6 +89,7 @@ public class CosmosAsyncContainer { this.queryItemsSpanName = "queryItems." + this.id; this.readAllConflictsSpanName = "readAllConflicts." + this.id; this.queryConflictsSpanName = "queryConflicts." + this.id; + this.batchSpanName = "transactionalBatch." + this.id; } /** @@ -493,6 +496,95 @@ private T transform(Object object, Class classType) { return Utils.getSimpleObjectMapper().convertValue(object, classType); } + /** + * Executes the transactional batch. + * + * @param transactionalBatch Batch having list of operation and partition key which will be executed by this container. + * + * @return A Mono response which contains details of execution of the transactional batch. + *

+ * If the transactional batch executes successfully, the value returned by {@link + * TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}. + *

+ * If an operation within the transactional batch fails during execution, no changes from the batch will be + * committed and the status of the failing operation is made available by {@link + * TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations + * that failed in case of some user error like conflict, not found etc, the response can be enumerated. + * This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the + * transactional batch in the order they were added to the transactional batch. + * For a result corresponding to an operation within the transactional batch, use + * {@link TransactionalBatchOperationResult#getStatusCode} + * to access the status of the operation. If the operation was not executed or it was aborted due to the failure of + * another operation within the transactional batch, the value of this field will be 424; + * for the operation that caused the batch to abort, the value of this field + * will indicate the cause of failure. + *

+ * If there are issues such as request timeouts, Gone, session not available, network failure + * or if the service somehow returns 5xx then the Mono will return error instead of TransactionalBatchResponse. + *

+ * Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the + * transactional batch succeeded. + */ + @Beta(Beta.SinceVersion.V4_7_0) + public Mono executeTransactionalBatch(TransactionalBatch transactionalBatch) { + return executeTransactionalBatch(transactionalBatch, new TransactionalBatchRequestOptions()); + } + + /** + * Executes the transactional batch. + * + * @param transactionalBatch Batch having list of operation and partition key which will be executed by this container. + * @param requestOptions Options that apply specifically to batch request. + * + * @return A Mono response which contains details of execution of the transactional batch. + *

+ * If the transactional batch executes successfully, the value returned by {@link + * TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}. + *

+ * If an operation within the transactional batch fails during execution, no changes from the batch will be + * committed and the status of the failing operation is made available by {@link + * TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations + * that failed in case of some user error like conflict, not found etc, the response can be enumerated. + * This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the + * transactional batch in the order they were added to the transactional batch. + * For a result corresponding to an operation within the transactional batch, use + * {@link TransactionalBatchOperationResult#getStatusCode} + * to access the status of the operation. If the operation was not executed or it was aborted due to the failure of + * another operation within the transactional batch, the value of this field will be 424; + * for the operation that caused the batch to abort, the value of this field + * will indicate the cause of failure. + *

+ * If there are issues such as request timeouts, Gone, session not available, network failure + * or if the service somehow returns 5xx then the Mono will return error instead of TransactionalBatchResponse. + *

+ * Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the + * transactional batch succeeded. + */ + @Beta(Beta.SinceVersion.V4_7_0) + public Mono executeTransactionalBatch( + TransactionalBatch transactionalBatch, + TransactionalBatchRequestOptions requestOptions) { + + if (requestOptions == null) { + requestOptions = new TransactionalBatchRequestOptions(); + } + + final TransactionalBatchRequestOptions transactionalBatchRequestOptions = requestOptions; + + return withContext(context -> { + final BatchExecutor executor = new BatchExecutor(this, transactionalBatch, transactionalBatchRequestOptions); + final Mono responseMono = executor.executeAsync(); + + return database.getClient().getTracerProvider(). + traceEnabledBatchResponsePublisher( + responseMono, + context, + this.batchSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + }); + } + /** * Reads an item. *

diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java index a9f14e603b01..2b7379f1cc53 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java @@ -270,6 +270,19 @@ private CosmosItemResponse blockDeleteItemResponse(Mono batchResponseMono) { + try { + return batchResponseMono.block(); + } catch (Exception ex) { + final Throwable throwable = Exceptions.unwrap(ex); + if (throwable instanceof CosmosException) { + throw (CosmosException) throwable; + } else { + throw ex; + } + } + } + /** * Read all items as {@link CosmosPagedIterable} in the current container. * @@ -453,6 +466,78 @@ public CosmosItemResponse deleteItem(T item, CosmosItemRequestOption return this.blockDeleteItemResponse(asyncContainer.deleteItem(item, options)); } + /** + * Executes the transactional batch. + * + * @param transactionalBatch Batch having list of operation and partition key which will be executed by this container. + * + * @return A TransactionalBatchResponse which contains details of execution of the transactional batch. + *

+ * If the transactional batch executes successfully, the value returned by {@link + * TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}. + *

+ * If an operation within the transactional batch fails during execution, no changes from the batch will be + * committed and the status of the failing operation is made available by {@link + * TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations + * that failed in case of some user error like conflict, not found etc, the response can be enumerated. + * This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the + * transactional batch in the order they were added to the transactional batch. + * For a result corresponding to an operation within the transactional batch, use + * {@link TransactionalBatchOperationResult#getStatusCode} + * to access the status of the operation. If the operation was not executed or it was aborted due to the failure of + * another operation within the transactional batch, the value of this field will be 424; + * for the operation that caused the batch to abort, the value of this field + * will indicate the cause of failure. + *

+ * If there are issues such as request timeouts, Gone, session not available, network failure + * or if the service somehow returns 5xx then this will throw an exception instead of returning a TransactionalBatchResponse. + *

+ * Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the + * transactional batch succeeded. + */ + @Beta(Beta.SinceVersion.V4_7_0) + public TransactionalBatchResponse executeTransactionalBatch(TransactionalBatch transactionalBatch) { + return this.blockBatchResponse(asyncContainer.executeTransactionalBatch(transactionalBatch)); + } + + /** + * Executes the transactional batch. + * + * @param transactionalBatch Batch having list of operation and partition key which will be executed by this container. + * @param requestOptions Options that apply specifically to batch request. + * + * @return A TransactionalBatchResponse which contains details of execution of the transactional batch. + *

+ * If the transactional batch executes successfully, the value returned by {@link + * TransactionalBatchResponse#getStatusCode} on the response returned will be set to 200}. + *

+ * If an operation within the transactional batch fails during execution, no changes from the batch will be + * committed and the status of the failing operation is made available by {@link + * TransactionalBatchResponse#getStatusCode} or by the exception. To obtain information about the operations + * that failed in case of some user error like conflict, not found etc, the response can be enumerated. + * This returns {@link TransactionalBatchOperationResult} instances corresponding to each operation in the + * transactional batch in the order they were added to the transactional batch. + * For a result corresponding to an operation within the transactional batch, use + * {@link TransactionalBatchOperationResult#getStatusCode} + * to access the status of the operation. If the operation was not executed or it was aborted due to the failure of + * another operation within the transactional batch, the value of this field will be 424; + * for the operation that caused the batch to abort, the value of this field + * will indicate the cause of failure. + *

+ * If there are issues such as request timeouts, Gone, session not available, network failure + * or if the service somehow returns 5xx then this will throw an exception instead of returning a TransactionalBatchResponse. + *

+ * Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the + * transactional batch succeeded. + */ + @Beta(Beta.SinceVersion.V4_7_0) + public TransactionalBatchResponse executeTransactionalBatch( + TransactionalBatch transactionalBatch, + TransactionalBatchRequestOptions requestOptions) { + + return this.blockBatchResponse(asyncContainer.executeTransactionalBatch(transactionalBatch, requestOptions)); + } + /** * Gets the Cosmos scripts using the current container as context. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosItemOperation.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosItemOperation.java new file mode 100644 index 000000000000..d7c78a07c35a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosItemOperation.java @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.util.Beta; + +@Beta(Beta.SinceVersion.V4_7_0) +public interface CosmosItemOperation { + String getId(); + + PartitionKey getPartitionKeyValue(); + + CosmosItemOperationType getOperationType(); + + T getItem(); +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosItemOperationType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosItemOperationType.java new file mode 100644 index 000000000000..7054d2839066 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosItemOperationType.java @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.batch.BatchRequestResponseConstant; +import com.azure.cosmos.util.Beta; + +@Beta(Beta.SinceVersion.V4_7_0) +public enum CosmosItemOperationType { + + CREATE(BatchRequestResponseConstant.OPERATION_CREATE), + DELETE(BatchRequestResponseConstant.OPERATION_DELETE), + READ(BatchRequestResponseConstant.OPERATION_READ), + REPLACE(BatchRequestResponseConstant.OPERATION_REPLACE), + UPSERT(BatchRequestResponseConstant.OPERATION_UPSERT); + + CosmosItemOperationType(String operationValue) { + this.operationValue = operationValue; + } + + String getOperationValue() { + return operationValue; + } + + private final String operationValue; +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatch.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatch.java new file mode 100644 index 000000000000..697b008f2211 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatch.java @@ -0,0 +1,342 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.batch.ItemBatchOperation; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.util.Beta; + +import java.util.ArrayList; +import java.util.List; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * Represents a batch of operations against items with the same {@link PartitionKey} in a container that will be performed + * in a transactional manner at the Azure Cosmos DB service. + *

+ * Use {@link TransactionalBatch#createTransactionalBatch(PartitionKey)} to create an instance of TransactionalBatch. + * Example + * This example atomically modifies a set of items as a batch. + *

{@code
+ * public class ToDoActivity {
+ *     public final String type;
+ *     public final String id;
+ *     public final String status;
+ *     public ToDoActivity(String type, String id, String status) {
+ *         this.type = type;
+ *         this.id = id;
+ *         this.status = status;
+ *     }
+ * }
+ *
+ * String activityType = "personal";
+ *
+ * ToDoActivity test1 = new ToDoActivity(activityType, "learning", "ToBeDone");
+ * ToDoActivity test2 = new ToDoActivity(activityType, "shopping", "Done");
+ * ToDoActivity test3 = new ToDoActivity(activityType, "swimming", "ToBeDone");
+ *
+ * TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(new Cosmos.PartitionKey(activityType));
+ * batch.createItemOperation(test1);
+ * batch.replaceItemOperation(test2.id, test2);
+ * batch.upsertItemOperation(test3);
+ * batch.deleteItemOperation("reading");
+ *
+ * TransactionalBatchResponse response = container.executeTransactionalBatch(batch);
+ *
+ * if (!response.isSuccessStatusCode()) {
+ *      // Handle and log exception
+ *      return;
+ * }
+ *
+ * // Look up interested results - e.g., via typed access on operation results
+ *
+ * TransactionalBatchOperationResult result = response.get(0);
+ * ToDoActivity readActivity = result.getItem(ToDoActivity.class);
+ *
+ * }
+ * + * Example + *

This example atomically reads a set of items as a batch. + *

{@code
+ * String activityType = "personal";
+ *
+ * TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(new Cosmos.PartitionKey(activityType));
+ * batch.readItemOperation("playing");
+ * batch.readItemOperation("walking");
+ * batch.readItemOperation("jogging");
+ * batch.readItemOperation("running");
+ *
+ * TransactionalBatchResponse response = container.executeTransactionalBatch(batch);
+ * List resultItems = new ArrayList();
+ *
+ * for (int i = 0; i < response.size(); i++) {
+ *     TransactionalBatchOperationResult result = response.get(0);
+ *     resultItems.add(result.getItem(ToDoActivity.class));
+ * }
+ *
+ * }
+ *

+ * See: + * Limits on TransactionalBatch requests. + */ +@Beta(Beta.SinceVersion.V4_7_0) +public final class TransactionalBatch { + + private final List> operations; + private final PartitionKey partitionKey; + + TransactionalBatch(PartitionKey partitionKey) { + checkNotNull(partitionKey, "expected non-null partitionKey"); + + this.operations = new ArrayList<>(); + this.partitionKey = partitionKey; + } + + /** + * Initializes a new instance of {@link TransactionalBatch} + * that will contain operations to be performed across multiple items in the container with the provided partition + * key in a transactional manner + * + * @param partitionKey the partition key for all items in the batch. + * + * @return A new instance of {@link TransactionalBatch}. + */ + public static TransactionalBatch createTransactionalBatch(PartitionKey partitionKey) { + return new TransactionalBatch(partitionKey); + } + + /** + * Adds an operation to create an item into the batch. + * + * @param item A JSON serializable object that must contain an id property. + * @param The type of item to be created. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation createItemOperation(T item) { + checkNotNull(item, "expected non-null item"); + return this.createItemOperation(item, new TransactionalBatchItemRequestOptions()); + } + + /** + * Adds an operation to create an item into the batch. + * + * @param The type of item to be created. + * + * @param item A JSON serializable object that must contain an id property. + * @param requestOptions The options for the item request. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation createItemOperation(T item, TransactionalBatchItemRequestOptions requestOptions) { + + checkNotNull(item, "expected non-null item"); + if (requestOptions == null) { + requestOptions = new TransactionalBatchItemRequestOptions(); + } + + ItemBatchOperation operation = new ItemBatchOperation( + CosmosItemOperationType.CREATE, + null, + this.getPartitionKeyValue(), + requestOptions.toRequestOptions(), + item + ); + + this.operations.add(operation); + + return operation; + } + + /** + * Adds an operation to delete an item into the batch. + * + * @param id The unique id of the item. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation deleteItemOperation(String id) { + checkNotNull(id, "expected non-null id"); + return this.deleteItemOperation(id, new TransactionalBatchItemRequestOptions()); + } + + /** + * Adds an operation to delete an item into the batch. + * + * @param id The unique id of the item. + * @param requestOptions The options for the item request. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation deleteItemOperation(String id, TransactionalBatchItemRequestOptions requestOptions) { + + checkNotNull(id, "expected non-null id"); + if (requestOptions == null) { + requestOptions = new TransactionalBatchItemRequestOptions(); + } + + ItemBatchOperation operation = new ItemBatchOperation<>( + CosmosItemOperationType.DELETE, + id, + this.getPartitionKeyValue(), + requestOptions.toRequestOptions(), + null + ); + + this.operations.add(operation); + + return operation; + } + + /** + * Adds an operation to read an item into the batch. + * + * @param id The unique id of the item. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation readItemOperation(String id) { + checkNotNull(id, "expected non-null id"); + return this.readItemOperation(id, new TransactionalBatchItemRequestOptions()); + } + + /** + * Adds an operation to read an item into the batch. + * + * @param id The unique id of the item. + * @param requestOptions The options for the item request. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation readItemOperation(String id, TransactionalBatchItemRequestOptions requestOptions) { + + checkNotNull(id, "expected non-null id"); + if (requestOptions == null) { + requestOptions = new TransactionalBatchItemRequestOptions(); + } + + ItemBatchOperation operation = new ItemBatchOperation<>( + CosmosItemOperationType.READ, + id, + this.getPartitionKeyValue(), + requestOptions.toRequestOptions(), + null + ); + + this.operations.add(operation); + + return operation; + } + + /** + * Adds an operation to replace an item into the batch. + * + * @param id The unique id of the item. + * @param item A JSON serializable object that must contain an id property. + * @param The type of item to be replaced. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation replaceItemOperation(String id, T item) { + checkNotNull(id, "expected non-null id"); + checkNotNull(item, "expected non-null item"); + return this.replaceItemOperation(id, item, new TransactionalBatchItemRequestOptions()); + } + + /** + * Adds an operation to replace an item into the batch. + * + * @param The type of item to be replaced. + * + * @param id The unique id of the item. + * @param item A JSON serializable object that must contain an id property. + * @param requestOptions The options for the item request. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation replaceItemOperation( + String id, T item, TransactionalBatchItemRequestOptions requestOptions) { + + checkNotNull(id, "expected non-null id"); + checkNotNull(item, "expected non-null item"); + if (requestOptions == null) { + requestOptions = new TransactionalBatchItemRequestOptions(); + } + + ItemBatchOperation operation = new ItemBatchOperation( + CosmosItemOperationType.REPLACE, + id, + this.getPartitionKeyValue(), + requestOptions.toRequestOptions(), + item + ); + + this.operations.add(operation); + + return operation; + } + + /** + * Adds an operation to upsert an item into the batch. + * + * @param item A JSON serializable object that must contain an id property. + * @param The type of item to be upserted. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation upsertItemOperation(T item) { + checkNotNull(item, "expected non-null item"); + return this.upsertItemOperation(item, new TransactionalBatchItemRequestOptions()); + } + + /** + * Adds an operation to upsert an item into the batch. + * + * @param The type of item to be upserted. + * + * @param item A JSON serializable object that must contain an id property. + * @param requestOptions The options for the item request. + * + * @return The transactional batch instance with the operation added. + */ + public CosmosItemOperation upsertItemOperation(T item, TransactionalBatchItemRequestOptions requestOptions) { + + checkNotNull(item, "expected non-null item"); + if (requestOptions == null) { + requestOptions = new TransactionalBatchItemRequestOptions(); + } + + ItemBatchOperation operation = new ItemBatchOperation( + CosmosItemOperationType.UPSERT, + null, + this.getPartitionKeyValue(), + requestOptions.toRequestOptions(), + item + ); + + this.operations.add(operation); + + return operation; + } + + /** + * Return the list of operation in an unmodifiable instance so no one can change it in the down path. + * + * @return The list of operations which are to be executed. + */ + public List getOperations() { + return UnmodifiableList.unmodifiableList(operations); + } + + /** + * Return the partition key for this batch. + * + * @return The partition key for this batch. + */ + public PartitionKey getPartitionKeyValue() { + return partitionKey; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchItemRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchItemRequestOptions.java new file mode 100644 index 000000000000..dde32da47d0e --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchItemRequestOptions.java @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.RequestOptions; +import com.azure.cosmos.util.Beta; + +/** + * Encapsulates options that can be specified for an operation within a {@link TransactionalBatch}. + */ +@Beta(Beta.SinceVersion.V4_7_0) +public final class TransactionalBatchItemRequestOptions { + private String ifMatchETag; + private String ifNoneMatchETag; + + /** + * Gets the If-Match (ETag) associated with the operation in TransactionalBatch. + * + * @return ifMatchETag the ifMatchETag associated with the request. + */ + public String getIfMatchETag() { + return this.ifMatchETag; + } + + /** + * Sets the If-Match (ETag) associated with the operation in TransactionalBatch. + * + * @param ifMatchETag the ifMatchETag associated with the request. + * @return the current request options + */ + public TransactionalBatchItemRequestOptions setIfMatchETag(final String ifMatchETag) { + this.ifMatchETag = ifMatchETag; + return this; + } + + /** + * Gets the If-None-Match (ETag) associated with the request in operation in TransactionalBatch. + * + * @return the ifNoneMatchETag associated with the request. + */ + public String getIfNoneMatchETag() { + return this.ifNoneMatchETag; + } + + /** + * Sets the If-None-Match (ETag) associated with the request in operation in TransactionalBatch. + * + * @param ifNoneMatchEtag the ifNoneMatchETag associated with the request. + * @return the current request options + */ + public TransactionalBatchItemRequestOptions setIfNoneMatchETag(final String ifNoneMatchEtag) { + this.ifNoneMatchETag = ifNoneMatchEtag; + return this; + } + + RequestOptions toRequestOptions() { + final RequestOptions requestOptions = new RequestOptions(); + requestOptions.setIfMatchETag(getIfMatchETag()); + requestOptions.setIfNoneMatchETag(getIfNoneMatchETag()); + return requestOptions; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchOperationResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchOperationResult.java new file mode 100644 index 000000000000..63c5f95c062a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchOperationResult.java @@ -0,0 +1,139 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.util.Beta; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.Duration; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * Represents a result for a specific operation that was part of a {@link TransactionalBatch} request. + */ +@Beta(Beta.SinceVersion.V4_7_0) +public final class TransactionalBatchOperationResult { + + private final String eTag; + private final double requestCharge; + private final ObjectNode resourceObject; + private final int statusCode; + private final Duration retryAfter; + private final int subStatusCode; + private final CosmosItemOperation cosmosItemOperation; + + /** + * Initializes a new instance of the {@link TransactionalBatchOperationResult} class. + */ + TransactionalBatchOperationResult(String eTag, + double requestCharge, + ObjectNode resourceObject, + int statusCode, + Duration retryAfter, + int subStatusCode, + CosmosItemOperation cosmosItemOperation) { + checkNotNull(statusCode, "expected non-null statusCode"); + checkNotNull(cosmosItemOperation, "expected non-null cosmosItemOperation"); + + this.eTag = eTag; + this.requestCharge = requestCharge; + this.resourceObject = resourceObject; + this.statusCode = statusCode; + this.retryAfter = retryAfter; + this.subStatusCode = subStatusCode; + this.cosmosItemOperation = cosmosItemOperation; + } + + /** + * Gets the entity tag associated with the current item. + * + * ETags are used for concurrency checking when updating resources. + * + * @return Entity tag associated with the current item. + */ + public String getETag() { + return this.eTag; + } + + /** + * Gets the request charge as request units (RU) consumed by the current operation. + *

+ * For more information about the RU and factors that can impact the effective charges please visit + * Request Units in Azure Cosmos DB + * + * @return the request charge. + */ + public double getRequestCharge() { + return this.requestCharge; + } + + /** + * Gets the item associated with the current result. + * + * @param the type parameter + * + * @param type class type for which deserialization is needed. + * + * @return item associated with the current result. + */ + public T getItem(final Class type) { + T item = null; + + if (this.getResourceObject() != null) { + item = new JsonSerializable(this.getResourceObject()).toObject(type); + } + + return item; + } + + /** + * Gets retry after. + * + * @return the retry after + */ + public Duration getRetryAfterDuration() { + return this.retryAfter; + } + + /** + * Gets sub status code associated with the current result. + * + * @return the sub status code + */ + public int getSubStatusCode() { + return this.subStatusCode; + } + + /** + * Gets a value indicating whether the current operation completed successfully. + * + * @return {@code true} if the current operation completed successfully; {@code false} otherwise. + */ + public boolean isSuccessStatusCode() { + return 200 <= this.statusCode && this.statusCode <= 299; + } + + /** + * Gets the HTTP status code associated with the current result. + * + * @return the status code. + */ + public int getStatusCode() { + return this.statusCode; + } + + ObjectNode getResourceObject() { + return resourceObject; + } + + /** + * Gets the original operation for this result. + * + * @return the CosmosItemOperation. + */ + public CosmosItemOperation getOperation() { + return cosmosItemOperation; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchRequestOptions.java new file mode 100644 index 000000000000..358e4bc594e5 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchRequestOptions.java @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.RequestOptions; +import com.azure.cosmos.util.Beta; + +/** + * Encapsulates options that can be specified for a {@link TransactionalBatch}. + */ +@Beta(Beta.SinceVersion.V4_7_0) +public final class TransactionalBatchRequestOptions { + private ConsistencyLevel consistencyLevel; + private String sessionToken; + + /** + * Gets the consistency level required for the request. + * + * @return the consistency level. + */ + ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + + /** + * Sets the consistency level required for the request. + * + * @param consistencyLevel the consistency level. + * @return the TransactionalBatchRequestOptions. + */ + TransactionalBatchRequestOptions setConsistencyLevel(ConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + return this; + } + + /** + * Gets the token for use with session consistency. + * + * @return the session token. + */ + public String getSessionToken() { + return sessionToken; + } + + /** + * Sets the token for use with session consistency. + * + * @param sessionToken the session token. + * @return the TransactionalBatchRequestOptions. + */ + public TransactionalBatchRequestOptions setSessionToken(String sessionToken) { + this.sessionToken = sessionToken; + return this; + } + + RequestOptions toRequestOptions() { + final RequestOptions requestOptions = new RequestOptions(); + requestOptions.setConsistencyLevel(getConsistencyLevel()); + requestOptions.setSessionToken(sessionToken); + return requestOptions; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchResponse.java new file mode 100644 index 000000000000..31176f241b6a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/TransactionalBatchResponse.java @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.batch.BatchExecUtils; +import com.azure.cosmos.util.Beta; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * Response of a {@link TransactionalBatch} request. + */ +@Beta(Beta.SinceVersion.V4_7_0) +public class TransactionalBatchResponse { + + private final Map responseHeaders; + private final int statusCode; + private final String errorMessage; + private final List results; + private final int subStatusCode; + private final CosmosDiagnostics cosmosDiagnostics; + + /** + * Initializes a new instance of the {@link TransactionalBatchResponse} class. + * + * @param statusCode the response status code. + * @param subStatusCode the response sub-status code. + * @param errorMessage an error message or {@code null}. + * @param responseHeaders the response http headers + * @param cosmosDiagnostics the diagnostic + */ + TransactionalBatchResponse( + final int statusCode, + final int subStatusCode, + final String errorMessage, + final Map responseHeaders, + final CosmosDiagnostics cosmosDiagnostics) { + + checkNotNull(statusCode, "expected non-null statusCode"); + checkNotNull(responseHeaders, "expected non-null responseHeaders"); + + this.statusCode = statusCode; + this.subStatusCode = subStatusCode; + this.errorMessage = errorMessage; + this.responseHeaders = responseHeaders; + this.cosmosDiagnostics = cosmosDiagnostics; + this.results = new ArrayList<>(); + } + + /** + * Gets the diagnostics information for the current request to Azure Cosmos DB service. + * + * @return diagnostics information for the current request to Azure Cosmos DB service. + */ + public CosmosDiagnostics getDiagnostics() { + return cosmosDiagnostics; + } + + /** + * Gets the number of operation results. + * + * @return the number of operations results in this response. + */ + public int size() { + return this.results == null ? 0 : this.results.size(); + } + + /** + * Returns a value indicating whether the batch was successfully processed. + * + * @return a value indicating whether the batch was successfully processed. + */ + public boolean isSuccessStatusCode() { + return this.statusCode >= 200 && this.statusCode <= 299; + } + + /** + * Gets the activity ID that identifies the server request made to execute the batch. + * + * @return the activity ID that identifies the server request made to execute the batch. + */ + public String getActivityId() { + return BatchExecUtils.getActivityId(this.responseHeaders); + } + + /** + * Gets the reason for the failure of the batch request, if any, or {@code null}. + * + * @return the reason for the failure of the batch request, if any, or {@code null}. + */ + public String getErrorMessage() { + return this.errorMessage; + } + + /** + * Gets the request charge as request units (RU) consumed by the batch operation. + *

+ * For more information about the RU and factors that can impact the effective charges please visit + * Request Units in Azure Cosmos DB + * + * @return the request charge. + */ + public double getRequestCharge() { + return BatchExecUtils.getRequestCharge(this.responseHeaders); + } + + /** + * Gets the HTTP status code associated with the response. + * + * @return the status code. + */ + public int getStatusCode() { + return this.statusCode; + } + + /** + * Gets the token used for managing client's consistency requirements. + * + * @return the session token. + */ + public String getSessionToken() { + return BatchExecUtils.getSessionToken(this.responseHeaders); + } + + /** + * Gets the headers associated with the response. + * + * @return the response headers. + */ + public Map getResponseHeaders() { + return this.responseHeaders; + } + + /** + * Gets the amount of time to wait before retrying this or any other request due to throttling. + * + * @return the amount of time to wait before retrying this or any other request due to throttling. + */ + public Duration getRetryAfterDuration() { + return BatchExecUtils.getRetryAfterDuration(this.responseHeaders); + } + + /** + * Gets the HTTP sub status code associated with the response. + * + * @return the sub status code. + */ + public int getSubStatusCode() { + return this.subStatusCode; + } + + /** + * Get all the results of the operations in a batch in an unmodifiable instance so no one can + * change it in the down path. + * + * @return Results of operations in a batch. + */ + public List getResults() { + return Collections.unmodifiableList(this.results); + } + + /** + * Gets the end-to-end request latency for the current request to Azure Cosmos DB service. + * + * @return end-to-end request latency for the current request to Azure Cosmos DB service. + */ + public Duration getDuration() { + if (cosmosDiagnostics == null) { + return Duration.ZERO; + } + + return this.cosmosDiagnostics.getDuration(); + } + + void addAll(List collection) { + this.results.addAll(collection); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java index a45980a7e043..2837d33269c7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java @@ -5,6 +5,8 @@ import com.azure.core.credential.AzureKeyCredential; import com.azure.core.credential.TokenCredential; import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.batch.ServerBatchRequest; +import com.azure.cosmos.TransactionalBatchResponse; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.CosmosItemIdentity; import com.azure.cosmos.models.CosmosQueryRequestOptions; @@ -805,6 +807,24 @@ Flux> queryStoredProcedures(String collectionLink, Mono executeStoredProcedure(String storedProcedureLink, RequestOptions options, List procedureParams); + /** + * Executes a batch request + *

+ * After subscription the operation will be performed. + * The {@link Mono} upon successful completion will contain a batch response which will have individual responses. + * In case of failure the {@link Mono} will error. + * + * @param collectionLink the link to the parent document collection. + * @param serverBatchRequest the batch request with the content and flags. + * @param options the request options. + * @param disableAutomaticIdGeneration the flag for disabling automatic id generation. + * @return a {@link Mono} containing the transactionalBatchResponse response which results of all operations. + */ + Mono executeBatchRequest(String collectionLink, + ServerBatchRequest serverBatchRequest, + RequestOptions options, + boolean disableAutomaticIdGeneration); + /** * Creates a trigger. *

diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index c87604884bf0..e9ae3b407e2c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -249,6 +249,11 @@ public static class HttpHeaders { public static final String API_TYPE = "x-ms-cosmos-apitype"; public static final String QUERY_METRICS = "x-ms-documentdb-query-metrics"; + // Batch operations + public static final String IS_BATCH_ATOMIC = "x-ms-cosmos-batch-atomic"; + public static final String IS_BATCH_ORDERED = "x-ms-cosmos-batch-ordered"; + public static final String IS_BATCH_REQUEST = "x-ms-cosmos-is-batch-request"; + public static final String SHOULD_BATCH_CONTINUE_ON_ERROR = "x-ms-cosmos-batch-continue-on-error"; } public static class A_IMHeaderValues { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java index 01bc4e85ec03..d3fda30412e7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/OperationType.java @@ -10,6 +10,7 @@ public enum OperationType { AbortPartitionMigration, AbortSplit, AddComputeGatewayRequestCharges, + Batch, BatchApply, BatchReportThroughputUtilization, CompletePartitionMigration, @@ -49,6 +50,7 @@ public boolean isWriteOperation() { this == ExecuteJavaScript || this == Replace || this == Upsert || - this == Update; + this == Update || + this == Batch; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 6f9500200ca5..616ddec441d7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -11,6 +11,10 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.DirectConnectionConfig; +import com.azure.cosmos.implementation.batch.BatchResponseParser; +import com.azure.cosmos.implementation.batch.ServerBatchRequest; +import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest; +import com.azure.cosmos.TransactionalBatchResponse; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.caches.RxClientCollectionCache; import com.azure.cosmos.implementation.caches.RxCollectionCache; @@ -78,6 +82,8 @@ import static com.azure.cosmos.BridgeInternal.toFeedResponsePage; import static com.azure.cosmos.BridgeInternal.toResourceResponse; import static com.azure.cosmos.BridgeInternal.toStoredProcedureResponse; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; import static com.azure.cosmos.models.ModelBridgeInternal.serializeJsonToByteBuffer; import static com.azure.cosmos.models.ModelBridgeInternal.toDatabaseAccount; @@ -1255,6 +1261,84 @@ private Mono getCreateDocumentRequest(DocumentClientRe return addPartitionKeyInformation(request, content, document, options, collectionObs); } + private Mono getBatchDocumentRequest(DocumentClientRetryPolicy requestRetryPolicy, + String documentCollectionLink, + ServerBatchRequest serverBatchRequest, + RequestOptions options, + boolean disableAutomaticIdGeneration) { + + checkArgument(StringUtils.isNotEmpty(documentCollectionLink), "expected non empty documentCollectionLink"); + checkNotNull(serverBatchRequest, "expected non null serverBatchRequest"); + + Instant serializationStartTimeUTC = Instant.now(); + ByteBuffer content = ByteBuffer.wrap(Utils.getUTF8Bytes(serverBatchRequest.getRequestBody())); + Instant serializationEndTimeUTC = Instant.now(); + + SerializationDiagnosticsContext.SerializationDiagnostics serializationDiagnostics = new SerializationDiagnosticsContext.SerializationDiagnostics( + serializationStartTimeUTC, + serializationEndTimeUTC, + SerializationDiagnosticsContext.SerializationType.ITEM_SERIALIZATION); + + String path = Utils.joinPath(documentCollectionLink, Paths.DOCUMENTS_PATH_SEGMENT); + Map requestHeaders = this.getRequestHeaders(options, ResourceType.Document, OperationType.Batch); + + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + this, + OperationType.Batch, + ResourceType.Document, + path, + requestHeaders, + options, + content); + + if (requestRetryPolicy != null) { + requestRetryPolicy.onBeforeSendRequest(request); + } + + SerializationDiagnosticsContext serializationDiagnosticsContext = BridgeInternal.getSerializationDiagnosticsContext(request.requestContext.cosmosDiagnostics); + if (serializationDiagnosticsContext != null) { + serializationDiagnosticsContext.addSerializationDiagnostics(serializationDiagnostics); + } + + Mono> collectionObs = + this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request); + + return collectionObs.map((Utils.ValueHolder collectionValueHolder) -> { + addBatchHeaders(request, serverBatchRequest, collectionValueHolder.v); + return request; + }); + } + + private RxDocumentServiceRequest addBatchHeaders(RxDocumentServiceRequest request, + ServerBatchRequest serverBatchRequest, + DocumentCollection collection) { + + if(serverBatchRequest instanceof SinglePartitionKeyServerBatchRequest) { + + PartitionKey partitionKey = ((SinglePartitionKeyServerBatchRequest) serverBatchRequest).getPartitionKeyValue(); + PartitionKeyInternal partitionKeyInternal; + + if (partitionKey.equals(PartitionKey.NONE)) { + PartitionKeyDefinition partitionKeyDefinition = collection.getPartitionKey(); + partitionKeyInternal = ModelBridgeInternal.getNonePartitionKey(partitionKeyDefinition); + } else { + // Partition key is always non-null + partitionKeyInternal = BridgeInternal.getPartitionKeyInternal(partitionKey); + } + + request.setPartitionKeyInternal(partitionKeyInternal); + request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY, Utils.escapeNonAscii(partitionKeyInternal.toJson())); + } else { + throw new UnsupportedOperationException("Unknown Server request."); + } + + request.getHeaders().put(HttpConstants.HttpHeaders.IS_BATCH_REQUEST, Boolean.TRUE.toString()); + request.getHeaders().put(HttpConstants.HttpHeaders.IS_BATCH_ATOMIC, String.valueOf(serverBatchRequest.isAtomicBatch())); + request.getHeaders().put(HttpConstants.HttpHeaders.SHOULD_BATCH_CONTINUE_ON_ERROR, String.valueOf(serverBatchRequest.isShouldContinueOnError())); + + return request; + } + private Mono populateHeaders(RxDocumentServiceRequest request, RequestVerb httpMethod) { request.getHeaders().put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123()); if (this.masterKeyOrResourceToken != null || this.resourceTokensMap != null @@ -2342,6 +2426,15 @@ public Mono executeStoredProcedure(String storedProcedu return ObservableHelper.inlineIfPossibleAsObs(() -> executeStoredProcedureInternal(storedProcedureLink, options, procedureParams, documentClientRetryPolicy), documentClientRetryPolicy); } + @Override + public Mono executeBatchRequest(String collectionLink, + ServerBatchRequest serverBatchRequest, + RequestOptions options, + boolean disableAutomaticIdGeneration) { + DocumentClientRetryPolicy documentClientRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy(); + return ObservableHelper.inlineIfPossibleAsObs(() -> executeBatchRequestInternal(collectionLink, serverBatchRequest, options, documentClientRetryPolicy, disableAutomaticIdGeneration), documentClientRetryPolicy); + } + private Mono executeStoredProcedureInternal(String storedProcedureLink, RequestOptions options, List procedureParams, DocumentClientRetryPolicy retryPolicy) { @@ -2375,6 +2468,29 @@ private Mono executeStoredProcedureInternal(String stor } } + private Mono executeBatchRequestInternal(String collectionLink, + ServerBatchRequest serverBatchRequest, + RequestOptions options, + DocumentClientRetryPolicy requestRetryPolicy, + boolean disableAutomaticIdGeneration) { + + try { + logger.debug("Executing a Batch request with number of operations {}", serverBatchRequest.getOperations().size()); + + Mono requestObs = getBatchDocumentRequest(requestRetryPolicy, collectionLink, serverBatchRequest, options, disableAutomaticIdGeneration); + Mono responseObservable = requestObs.flatMap(request -> { + return create(request, requestRetryPolicy); + }); + + return responseObservable + .map(serviceResponse -> BatchResponseParser.fromDocumentServiceResponse(serviceResponse, serverBatchRequest, true)); + + } catch (Exception ex) { + logger.debug("Failure in executing a batch due to [{}]", ex.getMessage(), ex); + return Mono.error(ex); + } + } + @Override public Mono> createTrigger(String collectionLink, Trigger trigger, RequestOptions options) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java index dc3b0ceb8d02..face2edc181d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceResponse.java @@ -182,7 +182,7 @@ private String getOwnerFullName() { return null; } - CosmosDiagnostics getCosmosDiagnostics() { + public CosmosDiagnostics getCosmosDiagnostics() { if (this.storeResponse == null) { return null; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index ad481d3d22fd..d8e881dc5cb9 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -351,6 +351,7 @@ private void validateOrThrow(RxDocumentServiceRequest request, private Mono invokeAsyncInternal(RxDocumentServiceRequest request) { switch (request.getOperationType()) { case Create: + case Batch: return this.create(request); case Upsert: return this.upsert(request); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java index 87c51d14412a..3887ffc7557b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java @@ -5,6 +5,7 @@ import com.azure.core.util.Context; import com.azure.core.util.tracing.Tracer; import com.azure.cosmos.CosmosException; +import com.azure.cosmos.TransactionalBatchResponse; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosResponse; import reactor.core.publisher.Mono; @@ -108,6 +109,15 @@ public > Mono traceEnabledCosmosResponsePublisher (T response) -> response.getStatusCode()); } + public Mono traceEnabledBatchResponsePublisher(Mono resultPublisher, + Context context, + String spanName, + String databaseId, + String endpoint) { + return traceEnabledPublisher(resultPublisher, context, spanName, databaseId, endpoint, + TransactionalBatchResponse::getStatusCode); + } + public Mono> traceEnabledCosmosItemResponsePublisher(Mono> resultPublisher, Context context, String spanName, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java index da3eb3ed1b3f..ed32bbd695bc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java @@ -351,7 +351,7 @@ public static boolean isCollectionChild(ResourceType type) { public static boolean isWriteOperation(OperationType operationType) { return operationType == OperationType.Create || operationType == OperationType.Upsert || operationType == OperationType.Delete || operationType == OperationType.Replace - || operationType == OperationType.ExecuteJavaScript; + || operationType == OperationType.ExecuteJavaScript || operationType == OperationType.Batch; } public static boolean isFeedRequest(OperationType requestOperationType) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecUtils.java new file mode 100644 index 000000000000..520794fb561e --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecUtils.java @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Map; + +/** + * Util methods for batch requests/response. + */ +public final class BatchExecUtils { + + private final static Logger logger = LoggerFactory.getLogger(BatchExecUtils.class); + + public static Duration getRetryAfterDuration(Map responseHeaders) { + long retryIntervalInMilliseconds = 0; + + if (responseHeaders != null) { + String header = responseHeaders.get(HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS); + + if (StringUtils.isNotEmpty(header)) { + try { + retryIntervalInMilliseconds = Long.parseLong(header); + } catch (NumberFormatException e) { + // If the value cannot be parsed as long, return 0. + } + } + } + + return Duration.ofMillis(retryIntervalInMilliseconds); + } + + public static String getSessionToken(Map responseHeaders) { + if (responseHeaders != null) { + return responseHeaders.get(HttpConstants.HttpHeaders.SESSION_TOKEN); + } + + return null; + } + + public static String getActivityId(Map responseHeaders) { + if (responseHeaders != null) { + return responseHeaders.get(HttpConstants.HttpHeaders.ACTIVITY_ID); + } + + return null; + } + + public static double getRequestCharge(Map responseHeaders) { + if (responseHeaders == null) { + return 0; + } + + final String value = responseHeaders.get(HttpConstants.HttpHeaders.REQUEST_CHARGE); + if (StringUtils.isEmpty(value)) { + return 0; + } + + try { + return Double.parseDouble(value); + } catch (NumberFormatException e) { + logger.warn("INVALID x-ms-request-charge value {}.", value); + return 0; + } + } + + public static int getSubStatusCode(Map responseHeaders) { + int code = HttpConstants.SubStatusCodes.UNKNOWN; + + if (responseHeaders != null) { + String subStatusString = responseHeaders.get(HttpConstants.HttpHeaders.SUB_STATUS); + if (StringUtils.isNotEmpty(subStatusString)) { + try { + code = Integer.parseInt(subStatusString); + } catch (NumberFormatException e) { + // If value cannot be parsed as Integer, return Unknown. + } + } + } + + return code; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecutor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecutor.java new file mode 100644 index 000000000000..91c68c39c753 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchExecutor.java @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosBridgeInternal; +import com.azure.cosmos.CosmosItemOperation; +import com.azure.cosmos.TransactionalBatch; +import com.azure.cosmos.TransactionalBatchRequestOptions; +import com.azure.cosmos.TransactionalBatchResponse; +import reactor.core.publisher.Mono; + +import java.util.List; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; + +public final class BatchExecutor { + + private final CosmosAsyncContainer container; + private final TransactionalBatchRequestOptions options; + private final TransactionalBatch transactionalBatch; + + public BatchExecutor( + final CosmosAsyncContainer container, + final TransactionalBatch transactionalBatch, + final TransactionalBatchRequestOptions options) { + + this.container = container; + this.transactionalBatch = transactionalBatch; + this.options = options; + } + + /** + * Create a batch request from list of operations and executes it. + * + * @return Response from the server. + */ + public final Mono executeAsync() { + + List operations = this.transactionalBatch.getOperations(); + checkArgument(operations.size() > 0, "Number of operations should be more than 0."); + + final SinglePartitionKeyServerBatchRequest request = SinglePartitionKeyServerBatchRequest.createBatchRequest( + this.transactionalBatch.getPartitionKeyValue(), + operations); + request.setAtomicBatch(true); + request.setShouldContinueOnError(false); + + return CosmosBridgeInternal.getAsyncDocumentClient(container.getDatabase()) + .executeBatchRequest(BridgeInternal.getLink(container), request, BridgeInternal.toRequestOptions(options), false); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstant.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstant.java new file mode 100644 index 000000000000..84429b20e295 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchRequestResponseConstant.java @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +/** + * This contains all the extra constants needed for batch/bulk. This will be usefull even if Hybrid row comes in. + * This contains all the constants we have in Backend. Any addition to backend should be added here. + */ +public class BatchRequestResponseConstant { + + // Size limits: + public static final int MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201; + public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100; + + static final String FIELD_OPERATION_TYPE = "operationType"; + static final String FIELD_RESOURCE_TYPE = "resourceType"; + static final String FIELD_TIME_TO_LIVE_IN_SECONDS = "timeToLiveInSeconds"; + static final String FIELD_ID = "id"; + static final String FIELD_INDEXING_DIRECTIVE = "indexingDirective"; + static final String FIELD_IF_MATCH = "ifMatch"; + static final String FIELD_IF_NONE_MATCH = "ifNoneMatch"; + static final String FIELD_PARTITION_KEY = "partitionKey"; + static final String FIELD_RESOURCE_BODY = "resourceBody"; + static final String FIELD_BINARY_ID = "binaryId"; + static final String FIELD_EFFECTIVE_PARTITIONKEY = "effectivePartitionKey"; + static final String FIELD_STATUS_CODE = "statusCode"; + static final String FIELD_SUBSTATUS_CODE = "subStatusCode"; + static final String FIELD_REQUEST_CHARGE = "requestCharge"; + static final String FIELD_RETRY_AFTER_MILLISECONDS = "retryAfterMilliseconds"; + static final String FIELD_ETAG = "eTag"; + static final String FIELD_MINIMAL_RETURN_PREFERENCE = "minimalReturnPreference"; + static final String FIELD_IS_CLIENTENCRYPTED = "isClientEncrypted"; + + // Batch supported operation type for json + public static final String OPERATION_CREATE = "Create"; + public static final String OPERATION_PATCH = "Patch"; + public static final String OPERATION_READ = "Read"; + public static final String OPERATION_UPSERT = "Upsert"; + public static final String OPERATION_DELETE = "Delete"; + public static final String OPERATION_REPLACE = "Replace"; +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchResponseParser.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchResponseParser.java new file mode 100644 index 000000000000..378561ae595c --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BatchResponseParser.java @@ -0,0 +1,228 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosItemOperation; +import com.azure.cosmos.TransactionalBatchOperationResult; +import com.azure.cosmos.TransactionalBatchResponse; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; +import com.azure.cosmos.implementation.Utils; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkState; + +public final class BatchResponseParser { + + private final static Logger logger = LoggerFactory.getLogger(BatchResponseParser.class); + private final static char HYBRID_V1 = 129; + + /** Creates a transactional batch response from a documentServiceResponse. + * + * @param documentServiceResponse the {@link RxDocumentServiceResponse response message}. + * @param request the {@link ServerBatchRequest batch request} that produced {@code message}. + * @param shouldPromoteOperationStatus indicates whether the operation status should be promoted. + * + * @return the {@link TransactionalBatchResponse transactional batch response} created + * from {@link RxDocumentServiceResponse message} when the batch operation completes. + */ + public static TransactionalBatchResponse fromDocumentServiceResponse( + final RxDocumentServiceResponse documentServiceResponse, + final ServerBatchRequest request, + final boolean shouldPromoteOperationStatus) { + + TransactionalBatchResponse response = null; + final byte[] responseContent = documentServiceResponse.getResponseBodyAsByteArray(); + + if (responseContent != null && responseContent.length > 0) { + response = BatchResponseParser.populateFromResponseContent(documentServiceResponse, request, shouldPromoteOperationStatus); + + if (response == null) { + // Convert any payload read failures as InternalServerError + response = BridgeInternal.createTransactionBatchResponse( + HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + HttpConstants.SubStatusCodes.UNKNOWN, + "ServerResponseDeserializationFailure", + documentServiceResponse.getResponseHeaders(), + documentServiceResponse.getCosmosDiagnostics()); + } + } + + int responseStatusCode = documentServiceResponse.getStatusCode(); + int responseSubStatusCode = BatchExecUtils.getSubStatusCode(documentServiceResponse.getResponseHeaders()); + + if (response == null) { + response = BridgeInternal.createTransactionBatchResponse( + responseStatusCode, + responseSubStatusCode, + null, + documentServiceResponse.getResponseHeaders(), + documentServiceResponse.getCosmosDiagnostics()); + } + + if (response.size() != request.getOperations().size()) { + if (responseStatusCode >= 200 && responseStatusCode <= 299) { + // Server should be guaranteeing number of results equal to operations when + // batch request is successful - so fail as InternalServerError if this is not the case. + response = BridgeInternal.createTransactionBatchResponse( + HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), + HttpConstants.SubStatusCodes.UNKNOWN, + "Invalid server response", + documentServiceResponse.getResponseHeaders(), + documentServiceResponse.getCosmosDiagnostics()); + } + + // When the overall response status code is TooManyRequests, propagate the RetryAfter into the individual operations. + Duration retryAfterDuration = Duration.ZERO; + if (responseStatusCode == HttpResponseStatus.TOO_MANY_REQUESTS.code()) { + retryAfterDuration = BatchExecUtils.getRetryAfterDuration(documentServiceResponse.getResponseHeaders()); + } + + BatchResponseParser.createAndPopulateResults(response, request.getOperations(), retryAfterDuration); + } + + checkState(response.size() == request.getOperations().size(), + "Number of responses should be equal to number of operations in request."); + + return response; + } + + private static TransactionalBatchResponse populateFromResponseContent( + final RxDocumentServiceResponse documentServiceResponse, + final ServerBatchRequest request, + final boolean shouldPromoteOperationStatus) { + + final List results = new ArrayList<>(request.getOperations().size()); + final byte[] responseContent = documentServiceResponse.getResponseBodyAsByteArray(); + + if (responseContent[0] != (byte)HYBRID_V1) { + // Read from a json response body. To enable hybrid row just complete the else part + final ObjectMapper mapper = Utils.getSimpleObjectMapper(); + + try { + final List cosmosItemOperations = request.getOperations(); + final ObjectNode[] objectNodes = mapper.readValue(responseContent, ObjectNode[].class); + + for (int index = 0; index < objectNodes.length; index++) { + ObjectNode objectInArray = objectNodes[index]; + + results.add( + BatchResponseParser.createBatchOperationResultFromJson(objectInArray, cosmosItemOperations.get(index))); + } + } catch (IOException ex) { + logger.error("Exception in parsing response", ex); + } + + } else { + // TODO(rakkuma): Implement hybrid row response parsing logic here. + // Issue: https://github.com/Azure/azure-sdk-for-java/issues/15856 + logger.error("Hybrid row is not implemented right now"); + return null; + } + + int responseStatusCode = documentServiceResponse.getStatusCode(); + int responseSubStatusCode = BatchExecUtils.getSubStatusCode(documentServiceResponse.getResponseHeaders()); + + // Status code of the exact operation which failed. + if (responseStatusCode == HttpResponseStatus.MULTI_STATUS.code() && shouldPromoteOperationStatus) { + for (TransactionalBatchOperationResult result : results) { + if (result.getStatusCode() != HttpResponseStatus.FAILED_DEPENDENCY.code() && + result.getStatusCode() >= 400) { + responseStatusCode = result.getStatusCode(); + responseSubStatusCode = result.getSubStatusCode(); + break; + } + } + } + + final TransactionalBatchResponse response = BridgeInternal.createTransactionBatchResponse( + responseStatusCode, + responseSubStatusCode, + null, + documentServiceResponse.getResponseHeaders(), + documentServiceResponse.getCosmosDiagnostics()); + + BridgeInternal.addTransactionBatchResultInResponse(response, results); + return response; + } + + /** + * Read batch operation result result. + * + * TODO(rakkuma): Similarly hybrid row result needs to be parsed. + * Issue: https://github.com/Azure/azure-sdk-for-java/issues/15856 + * + * @param objectNode having response for a single operation. + * + * @return the result + */ + private static TransactionalBatchOperationResult createBatchOperationResultFromJson( + ObjectNode objectNode, + CosmosItemOperation cosmosItemOperation) { + + final JsonSerializable jsonSerializable = new JsonSerializable(objectNode); + + final int statusCode = jsonSerializable.getInt(BatchRequestResponseConstant.FIELD_STATUS_CODE); + Integer subStatusCode = jsonSerializable.getInt(BatchRequestResponseConstant.FIELD_SUBSTATUS_CODE); + if (subStatusCode == null) { + subStatusCode = HttpConstants.SubStatusCodes.UNKNOWN; + } + + Double requestCharge = jsonSerializable.getDouble(BatchRequestResponseConstant.FIELD_REQUEST_CHARGE); + if (requestCharge == null) { + requestCharge = (double) 0; + } + + final String eTag = jsonSerializable.getString(BatchRequestResponseConstant.FIELD_ETAG); + final ObjectNode resourceBody = jsonSerializable.getObject(BatchRequestResponseConstant.FIELD_RESOURCE_BODY); + final Integer retryAfterMilliseconds = jsonSerializable.getInt(BatchRequestResponseConstant.FIELD_RETRY_AFTER_MILLISECONDS); + + return BridgeInternal.createTransactionBatchResult( + eTag, + requestCharge, + resourceBody, + statusCode, + retryAfterMilliseconds != null ? Duration.ofMillis(retryAfterMilliseconds) : Duration.ZERO, + subStatusCode, + cosmosItemOperation); + } + + /** + * Populate results to match number of operations to number of results in case of any error. + * + * @param response The transactionalBatchResponse in which to add the results + * @param operations List of operations for which the wrapper TransactionalBatchResponse is returned. + * @param retryAfterDuration retryAfterDuration. + * */ + private static void createAndPopulateResults(final TransactionalBatchResponse response, + final List operations, + final Duration retryAfterDuration) { + final List results = new ArrayList<>(operations.size()); + for (CosmosItemOperation cosmosItemOperation : operations) { + results.add( + BridgeInternal.createTransactionBatchResult( + null, + response.getRequestCharge(), + null, + response.getStatusCode(), + retryAfterDuration, + response.getSubStatusCode(), + cosmosItemOperation + )); + } + + BridgeInternal.addTransactionBatchResultInResponse(response, results); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/ItemBatchOperation.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/ItemBatchOperation.java new file mode 100644 index 000000000000..7ff74b406c47 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/ItemBatchOperation.java @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosItemOperation; +import com.azure.cosmos.CosmosItemOperationType; +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.implementation.RequestOptions; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.models.PartitionKey; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * Represents an operation on an item which will be executed as part of a batch request on a container. This will be + * serialized and sent in the request. + * + * @param The type of item. + */ +public final class ItemBatchOperation implements CosmosItemOperation { + + private TInternal item; + + private final String id; + private final PartitionKey partitionKey; + private final CosmosItemOperationType operationType; + private final RequestOptions requestOptions; + + public ItemBatchOperation( + final CosmosItemOperationType operationType, + final String id, + final PartitionKey partitionKey, + final RequestOptions requestOptions, + final TInternal item) { + + checkNotNull(operationType, "expected non-null operationType"); + + this.operationType = operationType; + this.partitionKey = partitionKey; + this.id = id; + this.item = item; + this.requestOptions = requestOptions; + } + + /** + * Writes a single operation to JsonSerializable. + * TODO(rakkuma): Similarly for hybrid row, operation needs to be written in Hybrid row. + * Issue: https://github.com/Azure/azure-sdk-for-java/issues/15856 + * + * @return instance of JsonSerializable containing values for a operation. + */ + JsonSerializable serializeOperation() { + final JsonSerializable jsonSerializable = new JsonSerializable(); + + jsonSerializable.set( + BatchRequestResponseConstant.FIELD_OPERATION_TYPE, + BridgeInternal.getOperationValueForCosmosItemOperationType(this.getOperationType())); + + if (StringUtils.isNotEmpty(this.getId())) { + jsonSerializable.set(BatchRequestResponseConstant.FIELD_ID, this.getId()); + } + + if (this.getItemInternal() != null) { + jsonSerializable.set(BatchRequestResponseConstant.FIELD_RESOURCE_BODY, this.getItemInternal()); + } + + if (this.getRequestOptions() != null) { + RequestOptions requestOptions = this.getRequestOptions(); + + if (StringUtils.isNotEmpty(requestOptions.getIfMatchETag())) { + jsonSerializable.set(BatchRequestResponseConstant.FIELD_IF_MATCH, requestOptions.getIfMatchETag()); + } + + if (StringUtils.isNotEmpty(requestOptions.getIfNoneMatchETag())) { + jsonSerializable.set(BatchRequestResponseConstant.FIELD_IF_NONE_MATCH, requestOptions.getIfNoneMatchETag()); + } + } + + return jsonSerializable; + } + + TInternal getItemInternal() { + return this.item; + } + + @SuppressWarnings("unchecked") + public T getItem() { + return (T)this.item; + } + + public String getId() { + return this.id; + } + + public PartitionKey getPartitionKeyValue() { + return partitionKey; + } + + public CosmosItemOperationType getOperationType() { + return this.operationType; + } + + public RequestOptions getRequestOptions() { + return this.requestOptions; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/ServerBatchRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/ServerBatchRequest.java new file mode 100644 index 000000000000..c55cdef8f763 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/ServerBatchRequest.java @@ -0,0 +1,125 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.CosmosItemOperation; +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.fasterxml.jackson.databind.node.ArrayNode; + +import java.util.List; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkState; + +/** + * This class represents a server batch request. + */ +public abstract class ServerBatchRequest { + + private final int maxBodyLength; + private final int maxOperationCount; + + private String requestBody; + private List operations; + private boolean isAtomicBatch = false; + private boolean shouldContinueOnError = false; + + /** + * Initializes a new {@link ServerBatchRequest request} instance. + * + * @param maxBodyLength Maximum length allowed for the request body. + * @param maxOperationCount Maximum number of operations allowed in the request. + */ + ServerBatchRequest(int maxBodyLength, int maxOperationCount) { + this.maxBodyLength = maxBodyLength; + this.maxOperationCount = maxOperationCount; + } + + /** + * Adds as many operations as possible from the given list of operations. + * TODO(rakkuma): Similarly for hybrid row, request needs to be parsed to create a request body in any form. + * Issue: https://github.com/Azure/azure-sdk-for-java/issues/15856 + * + * Operations are added in order while ensuring the request body never exceeds {@link #maxBodyLength}. + * + * @param operations operations to be added; read-only. + * + * @return Any pending operations that were not included in the request. + */ + final List createBodyOfBatchRequest(final List operations) { + + checkNotNull(operations, "expected non-null operations"); + + int totalSerializedLength = 0; + int totalOperationCount = 0; + + final ArrayNode arrayNode = Utils.getSimpleObjectMapper().createArrayNode(); + + for(CosmosItemOperation operation : operations) { + if (operation instanceof ItemBatchOperation) { + final ItemBatchOperation itemBatchOperation = (ItemBatchOperation) operation; + final JsonSerializable operationJsonSerializable = itemBatchOperation.serializeOperation(); + + // TODO(rakkuma): If the string contains unicode the byte encoding len will be more. Fix it. + // Issue: https://github.com/Azure/azure-sdk-for-java/issues/16112 + final int operationSerializedLength = operationJsonSerializable.toString().length(); + + if (totalOperationCount != 0 && + (totalSerializedLength + operationSerializedLength > this.maxBodyLength || totalOperationCount + 1 > this.maxOperationCount)) { + // Apply the limit only if at least there is one operation in selected operations. + break; + } + + totalSerializedLength += operationSerializedLength; + totalOperationCount++; + + arrayNode.add(operationJsonSerializable.getPropertyBag()); + } else { + throw new UnsupportedOperationException("Unknown CosmosItemOperation."); + } + } + + // TODO(rakkuma): This should change to byte array later as optimisation. + // Issue: https://github.com/Azure/azure-sdk-for-java/issues/16112 + this.requestBody = arrayNode.toString(); + + this.operations = operations.subList(0, totalOperationCount); + return operations.subList(totalOperationCount, operations.size()); + } + + public final String getRequestBody() { + checkState(this.requestBody != null, "expected non-null body"); + + return this.requestBody; + } + + /** + * Gets the list of {@link CosmosItemOperation operations} in this {@link ServerBatchRequest batch request}. + * + * The list returned by this method is unmodifiable. + * + * @return the list of {@link CosmosItemOperation operations} in this {@link ServerBatchRequest batch request}. + */ + public final List getOperations() { + return UnmodifiableList.unmodifiableList(this.operations); + } + + public boolean isAtomicBatch() { + return this.isAtomicBatch; + } + + void setAtomicBatch(boolean atomicBatch) { + this.isAtomicBatch = atomicBatch; + } + + public boolean isShouldContinueOnError() { + return this.shouldContinueOnError; + } + + void setShouldContinueOnError(boolean shouldContinueOnError) { + this.shouldContinueOnError = shouldContinueOnError; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/SinglePartitionKeyServerBatchRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/SinglePartitionKeyServerBatchRequest.java new file mode 100644 index 000000000000..0a433fd62952 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/SinglePartitionKeyServerBatchRequest.java @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.CosmosItemOperation; +import com.azure.cosmos.models.PartitionKey; +import java.util.List; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +public final class SinglePartitionKeyServerBatchRequest extends ServerBatchRequest { + + private final PartitionKey partitionKey; + + /** + * Initializes a new instance of the {@link SinglePartitionKeyServerBatchRequest} class. Single partition key server + * request. + * + * @param partitionKey Partition key that applies to all operations in this request. + */ + private SinglePartitionKeyServerBatchRequest(final PartitionKey partitionKey) { + super(Integer.MAX_VALUE, Integer.MAX_VALUE); + this.partitionKey = partitionKey; + } + + /** + * Creates an instance of {@link SinglePartitionKeyServerBatchRequest}. The body of the request is populated with + * operations till it reaches the provided maxBodyLength. + * + * @param partitionKey Partition key of the request. + * @param operations Operations to be added into this batch request. + * + * @return A newly created instance of {@link SinglePartitionKeyServerBatchRequest}. + */ + static SinglePartitionKeyServerBatchRequest createBatchRequest( + final PartitionKey partitionKey, + final List operations) { + + checkNotNull(partitionKey, "expected non-null partitionKey"); + checkNotNull(operations, "expected non-null operations"); + + final SinglePartitionKeyServerBatchRequest request = new SinglePartitionKeyServerBatchRequest(partitionKey); + request.createBodyOfBatchRequest(operations); + + return request; + } + + /** + * Returns the {@link PartitionKey partition key} that applies to all operations in this {@link + * SinglePartitionKeyServerBatchRequest batch request}. + * + * @return the {@link PartitionKey partition key} that applies to all operations in this {@link + * SinglePartitionKeyServerBatchRequest batch request}. + */ + public PartitionKey getPartitionKeyValue() { + return this.partitionKey; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 3477b33827f6..3dade23bb1ef 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -13,6 +13,7 @@ import com.azure.cosmos.implementation.IAuthorizationTokenProvider; import com.azure.cosmos.implementation.ISessionContainer; import com.azure.cosmos.implementation.Integers; +import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.RMResources; import com.azure.cosmos.implementation.RequestChargeTracker; import com.azure.cosmos.implementation.RequestTimeoutException; @@ -156,10 +157,11 @@ Mono writePrivateAsync( }).flatMap(primaryUri -> { try { primaryURI.set(primaryUri); - if (this.useMultipleWriteLocations && + if ((this.useMultipleWriteLocations || request.getOperationType() == OperationType.Batch) && RequestHelper.getConsistencyLevelToUse(this.serviceConfigReader, request) == ConsistencyLevel.SESSION) { // Set session token to ensure session consistency for write requests - // when writes can be issued to multiple locations + // 1. when writes can be issued to multiple locations + // 2. When we have Batch requests, since it can have Reads in it. SessionTokenHelper.setPartitionLocalSessionToken(request, this.sessionContainer); } else { // When writes can only go to single location, there is no reason diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java index 413a1ddae4d5..0475f86ff0aa 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/HttpTransportClient.java @@ -285,6 +285,7 @@ private HttpRequest prepareHttpMessage( // HttpRequestMessage -> StreamContent -> MemoryStream all get disposed, the original stream will be left open. switch (resourceOperation.operationType) { case Create: + case Batch: requestUri = getResourceFeedUri(resourceOperation.resourceType, physicalAddress.getURIAsString(), request); method = HttpMethod.POST; assert request.getContentAsByteArrayFlux() != null; @@ -480,6 +481,13 @@ private HttpRequest prepareHttpMessage( HttpTransportClient.addHeader(httpRequestHeaders, CustomHeaders.HttpHeaders.EXCLUDE_SYSTEM_PROPERTIES, request); + if (resourceOperation.operationType == OperationType.Batch) { + HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_BATCH_REQUEST, request); + HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.SHOULD_BATCH_CONTINUE_ON_ERROR, request); + HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_BATCH_ORDERED, request); + HttpTransportClient.addHeader(httpRequestHeaders, HttpConstants.HttpHeaders.IS_BATCH_ATOMIC, request); + } + return httpRequestMessage; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java index e37ef30552b9..67c2908f8cff 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConstants.java @@ -259,7 +259,8 @@ public enum RntbdOperationType { AbortPartitionMigration((short) 0x001F, OperationType.AbortPartitionMigration), PreReplaceValidation((short) 0x0020, OperationType.PreReplaceValidation), AddComputeGatewayRequestCharges((short) 0x0021, OperationType.AddComputeGatewayRequestCharges), - MigratePartition((short) 0x0022, OperationType.MigratePartition); + MigratePartition((short) 0x0022, OperationType.MigratePartition), + Batch((short) 0x0025, OperationType.Batch); private final short id; private final OperationType type; @@ -341,6 +342,8 @@ public static RntbdOperationType fromId(final short id) { return RntbdOperationType.AddComputeGatewayRequestCharges; case 0x0022: return RntbdOperationType.MigratePartition; + case 0x0025: + return RntbdOperationType.Batch; default: throw new DecoderException(lenientFormat("expected byte value matching %s value, not %s", RntbdOperationType.class.getSimpleName(), @@ -416,6 +419,8 @@ public static RntbdOperationType fromType(OperationType type) { return RntbdOperationType.MigratePartition; case AddComputeGatewayRequestCharges: return RntbdOperationType.AddComputeGatewayRequestCharges; + case Batch: + return RntbdOperationType.Batch; default: throw new IllegalArgumentException(lenientFormat("unrecognized operation type: %s", type)); } @@ -572,6 +577,9 @@ public enum RntbdRequestHeader implements RntbdHeader { AllowTentativeWrites((short) 0x0066, RntbdTokenType.Byte, false), IsUserRequest((short) 0x0067, RntbdTokenType.Byte, false), SharedOfferThroughput((short) 0x0068, RntbdTokenType.ULong, false), + IsBatchAtomic((short) 0x0073, RntbdTokenType.Byte, false), + ShouldBatchContinueOnError((short) 0x0074, RntbdTokenType.Byte, false), + IsBatchOrdered((short) 0x0075, RntbdTokenType.Byte, false), ReturnPreference((short) 0x0082, RntbdTokenType.Byte, false); public static final ImmutableMap map; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java index 87483ce79c4f..dd3d8379501b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestFrame.java @@ -203,6 +203,8 @@ private static RntbdOperationType map(final OperationType operationType) { return RntbdOperationType.MigratePartition; case AddComputeGatewayRequestCharges: return RntbdOperationType.AddComputeGatewayRequestCharges; + case Batch: + return RntbdOperationType.Batch; default: final String reason = Strings.lenientFormat("Unrecognized operation type: %s", operationType); throw new UnsupportedOperationException(reason); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java index fc7e7f0c6906..9178a9fafa92 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestHeaders.java @@ -151,6 +151,9 @@ final class RntbdRequestHeaders extends RntbdTokenStream { this.fillTokenFromHeader(headers, this::getTargetLsn, HttpHeaders.TARGET_LSN); this.fillTokenFromHeader(headers, this::getTimeToLiveInSeconds, BackendHeaders.TIME_TO_LIVE_IN_SECONDS); this.fillTokenFromHeader(headers, this::getTransportRequestID, HttpHeaders.TRANSPORT_REQUEST_ID); + this.fillTokenFromHeader(headers, this::isBatchAtomic, HttpHeaders.IS_BATCH_ATOMIC); + this.fillTokenFromHeader(headers, this::shouldBatchContinueOnError, HttpHeaders.SHOULD_BATCH_CONTINUE_ON_ERROR); + this.fillTokenFromHeader(headers, this::isBatchOrdered, HttpHeaders.IS_BATCH_ORDERED); // Will be null in case of direct, which is fine - BE will use the value slice the connection context this. // When this is used in Gateway, the header value will be populated with the proxied HTTP request's header, @@ -564,6 +567,19 @@ private RntbdToken getUserName() { return this.get(RntbdRequestHeader.UserName); } + // Batch + private RntbdToken isBatchAtomic() { + return this.get(RntbdRequestHeader.IsBatchAtomic); + } + + private RntbdToken shouldBatchContinueOnError() { + return this.get(RntbdRequestHeader.ShouldBatchContinueOnError); + } + + private RntbdToken isBatchOrdered() { + return this.get(RntbdRequestHeader.IsBatchOrdered); + } + private void addAimHeader(final Map headers) { final String value = headers.get(HttpHeaders.A_IM); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java index b585233ba6b0..800578a4e673 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java @@ -43,6 +43,8 @@ public enum SinceVersion { /** v4.5.1 */ V4_5_1, /** v4.6.0 */ - V4_6_0 + V4_6_0, + /** v4.7.0 */ + V4_7_0 } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/BatchOperationResultTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/BatchOperationResultTests.java new file mode 100644 index 000000000000..67f815aa5841 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/BatchOperationResultTests.java @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.batch.ItemBatchOperation; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.testng.annotations.Test; + +import java.time.Duration; +import static org.assertj.core.api.Assertions.assertThat; + +public class BatchOperationResultTests { + + private static final int TIMEOUT = 40000; + private ObjectNode objectNode = Utils.getSimpleObjectMapper().createObjectNode(); + private ItemBatchOperation operation = new ItemBatchOperation<>( + CosmosItemOperationType.READ, + null, + null, + null, + null + ); + + private TransactionalBatchOperationResult createTestResult() { + TransactionalBatchOperationResult result = BridgeInternal.createTransactionBatchResult( + "TestETag", + 1.4, + objectNode, + HttpResponseStatus.OK.code(), + Duration.ofMillis(1234), + HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE, + operation + ); + + return result; + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void propertiesAreSetThroughCtor() { + TransactionalBatchOperationResult result = createTestResult(); + + assertThat(result.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(result.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE); + assertThat(result.getETag()).isEqualTo("TestETag"); + assertThat(result.getRequestCharge()).isEqualTo(1.4); + assertThat(result.getRetryAfterDuration()).isEqualTo(Duration.ofMillis(1234)); + assertThat(result.getResourceObject()).isSameAs(objectNode); + assertThat(result.getOperation()).isSameAs(operation); + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void isSuccessStatusCodeTrueFor200To299() { + for (int x = 100; x < 999; ++x) { + TransactionalBatchOperationResult result = BridgeInternal.createTransactionBatchResult( + null, + 0.0, + null, + x, + null, + 0, + operation + ); + + boolean success = x >= 200 && x <= 299; + assertThat(result.isSuccessStatusCode()).isEqualTo(success); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/BatchTestBase.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/BatchTestBase.java new file mode 100644 index 000000000000..78af0230314f --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/BatchTestBase.java @@ -0,0 +1,340 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.ISessionToken; +import com.azure.cosmos.implementation.SessionTokenHelper; +import com.azure.cosmos.implementation.VectorSessionToken; +import com.azure.cosmos.implementation.apachecommons.collections.map.UnmodifiableMap; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.rx.TestSuiteBase; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.assertj.core.api.Assertions; +import org.assertj.core.data.Offset; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class BatchTestBase extends TestSuiteBase { + + private Random random = new Random(); + String partitionKey1 = "TBD1"; + + // items in partitionKey1 + TestDoc TestDocPk1ExistingA; + TestDoc TestDocPk1ExistingB ; + TestDoc TestDocPk1ExistingC; + TestDoc TestDocPk1ExistingD; + + public BatchTestBase(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + void createJsonTestDocs(CosmosContainer container) { + this.TestDocPk1ExistingA = this.createJsonTestDoc(container, this.partitionKey1); + this.TestDocPk1ExistingB = this.createJsonTestDoc(container, this.partitionKey1); + this.TestDocPk1ExistingC = this.createJsonTestDoc(container, this.partitionKey1); + this.TestDocPk1ExistingD = this.createJsonTestDoc(container, this.partitionKey1); + } + + void createJsonTestDocs(CosmosAsyncContainer container) { + this.TestDocPk1ExistingA = this.createJsonTestDoc(container, this.partitionKey1); + this.TestDocPk1ExistingB = this.createJsonTestDoc(container, this.partitionKey1); + this.TestDocPk1ExistingC = this.createJsonTestDoc(container, this.partitionKey1); + this.TestDocPk1ExistingD = this.createJsonTestDoc(container, this.partitionKey1); + } + + TestDoc populateTestDoc(String partitionKey) { + return populateTestDoc(partitionKey, 20); + } + + TestDoc populateTestDoc(String partitionKey, int minDesiredSize) { + String description = StringUtils.repeat("x", minDesiredSize); + return new TestDoc(UUID.randomUUID().toString(), this.random.nextInt(), description, partitionKey); + } + + TestDoc getTestDocCopy(TestDoc testDoc) { + return new TestDoc(testDoc.getId(), testDoc.getCost(), testDoc.getDescription(), testDoc.getStatus()); + } + + void verifyByRead(CosmosContainer container, TestDoc doc) { + verifyByRead(container, doc, null); + } + + void verifyByRead(CosmosContainer container, TestDoc doc, String eTag) { + PartitionKey partitionKey = this.getPartitionKey(doc.getStatus()); + + CosmosItemResponse response = container.readItem(doc.getId(), partitionKey, TestDoc.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(response.getItem()).isEqualTo(doc); + + if (eTag != null) { + assertThat(response.getETag()).isEqualTo(eTag); + } + } + + void verifyNotFound(CosmosContainer container, TestDoc doc) { + String id = doc.getId(); + PartitionKey partitionKey = this.getPartitionKey(doc.getStatus()); + + try { + CosmosItemResponse response = container.readItem(id, partitionKey, TestDoc.class); + Assertions.fail("Should throw NOT_FOUND exception"); + } catch (CosmosException ex) { + assertThat(ex.getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + } + } + + PartitionKey getPartitionKey(String partitionKey) { + return new PartitionKey(partitionKey); + } + + private TestDoc createJsonTestDoc(CosmosContainer container, String partitionKey) { + return createJsonTestDoc(container, partitionKey, 20); + } + + TestDoc createJsonTestDoc(CosmosContainer container, String partitionKey, int minDesiredSize) { + TestDoc doc = this.populateTestDoc(partitionKey, minDesiredSize); + CosmosItemResponse createResponse = container.createItem(doc, this.getPartitionKey(partitionKey), null); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + return doc; + } + + private TestDoc createJsonTestDoc(CosmosAsyncContainer container, String partitionKey) { + return createJsonTestDoc(container, partitionKey, 20); + } + + TestDoc createJsonTestDoc(CosmosAsyncContainer container, String partitionKey, int minDesiredSize) { + TestDoc doc = this.populateTestDoc(partitionKey, minDesiredSize); + CosmosItemResponse createResponse = container.createItem(doc, this.getPartitionKey(partitionKey), null).block(); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + return doc; + } + + public Random getRandom() { + return random; + } + + ISessionToken getSessionToken(String sessionToken) { + String[] tokenParts = org.apache.commons.lang3.StringUtils.split(sessionToken, ':'); + return SessionTokenHelper.parse(tokenParts[1]); + } + + String getDifferentLSNToken(String token, long lsnDifferent) throws Exception { + String[] tokenParts = org.apache.commons.lang3.StringUtils.split(token, ':'); + ISessionToken sessionToken = SessionTokenHelper.parse(tokenParts[1]); + ISessionToken differentSessionToken = createSessionToken(sessionToken, sessionToken.getLSN() + lsnDifferent); + return String.format("%s:%s", tokenParts[0], differentSessionToken.convertToString()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static ISessionToken createSessionToken(ISessionToken from, long globalLSN) throws Exception { + // Creates session token with specified GlobalLSN + if (from instanceof VectorSessionToken) { + VectorSessionToken fromSessionToken = (VectorSessionToken) from; + Field fieldVersion = VectorSessionToken.class.getDeclaredField("version"); + fieldVersion.setAccessible(true); + Long version = (Long) fieldVersion.get(fromSessionToken); + + Field fieldLocalLsnByRegion = VectorSessionToken.class.getDeclaredField("localLsnByRegion"); + fieldLocalLsnByRegion.setAccessible(true); + UnmodifiableMap localLsnByRegion = (UnmodifiableMap) fieldLocalLsnByRegion.get(fromSessionToken); + + Constructor constructor = VectorSessionToken.class.getDeclaredConstructor(long.class, long.class, UnmodifiableMap.class); + constructor.setAccessible(true); + VectorSessionToken vectorSessionToken = constructor.newInstance(version, globalLSN, localLsnByRegion); + return vectorSessionToken; + } else { + throw new IllegalArgumentException(); + } + } + + void verifyBatchProcessed(TransactionalBatchResponse batchResponse, int numberOfOperations) { + this.verifyBatchProcessed(batchResponse, numberOfOperations, HttpResponseStatus.OK); + } + + void verifyBatchProcessed(TransactionalBatchResponse batchResponse, int numberOfOperations, HttpResponseStatus expectedStatusCode) { + assertThat(batchResponse).isNotNull(); + assertThat(batchResponse.getStatusCode()) + .as("Batch server response had StatusCode {0} instead of {1} expected and had ErrorMessage {2}", + batchResponse.getStatusCode(), expectedStatusCode.code()) + .isEqualTo(expectedStatusCode.code()); + + assertThat(batchResponse.size()).isEqualTo(numberOfOperations); + assertThat(batchResponse.getRequestCharge()).isPositive(); + assertThat(batchResponse.getDiagnostics().toString()).isNotEmpty(); + + // Allow a delta since we round both the total charge and the individual operation + // charges to 2 decimal places. + assertThat(batchResponse.getRequestCharge()) + .isCloseTo(batchResponse.getResults().stream().mapToDouble(TransactionalBatchOperationResult::getRequestCharge).sum(), + Offset.offset(0.1)); + } + + public static class TestDoc { + public String id; + public int cost; + public String description; + + @JsonProperty("mypk") + public String status; + + public TestDoc() { + + } + + public TestDoc(String id, int cost, String description, String status) { + this.id = id; + this.cost = cost; + this.description = description; + this.status = status; + } + + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + TestDoc testDoc2 = (TestDoc) obj; + return (this.getId().equals(testDoc2.getId()) && + this.getCost() == testDoc2.getCost()) && + this.getDescription().equals(testDoc2.getDescription()) && + this.getStatus().equals(testDoc2.getStatus()); + } + + @Override + public int hashCode() { + int hashCode = 1652434776; + hashCode = (hashCode * -1521134295) + this.id.hashCode(); + hashCode = (hashCode * -1521134295) + this.cost; + hashCode = (hashCode * -1521134295) + this.description.hashCode(); + hashCode = (hashCode * -1521134295) + this.status.hashCode(); + return hashCode; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public int getCost() { + return cost; + } + + public void setCost(int cost) { + this.cost = cost; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + } + + public static class EventDoc { + + public String id; + int clicks; + int views; + String type; + + @JsonProperty("mypk") + public String partitionKey; + + + public EventDoc() { + + } + + public EventDoc(String id, int clicks, int views, String type, String partitionKey) { + this.id = id; + this.clicks = clicks; + this.views = views; + this.type = type; + this.partitionKey = partitionKey; + } + + public String getId() { + return id; + } + + public int getClicks() { + return clicks; + } + + public int getViews() { + return views; + } + + public String getType() { + return type; + } + + public String getPartitionKey() { + return partitionKey; + } + + public void setId(String id) { + this.id = id; + } + + public void setClicks(int clicks) { + this.clicks = clicks; + } + + public void setViews(int views) { + this.views = views; + } + + public void setType(String type) { + this.type = type; + } + + public void setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + EventDoc eventDoc = (EventDoc) o; + return clicks == eventDoc.clicks && + views == eventDoc.views && + Objects.equals(id, eventDoc.id) && + Objects.equals(type, eventDoc.type) && + Objects.equals(partitionKey, eventDoc.partitionKey); + } + + @Override + public int hashCode() { + return Objects.hash(id, clicks, views, type, partitionKey); + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/TransactionalBatchAsyncContainerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/TransactionalBatchAsyncContainerTest.java new file mode 100644 index 000000000000..1965a8691f0c --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/TransactionalBatchAsyncContainerTest.java @@ -0,0 +1,133 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.models.CosmosItemResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.assertj.core.api.Assertions; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TransactionalBatchAsyncContainerTest extends BatchTestBase { + + private CosmosAsyncClient batchClient; + private CosmosAsyncContainer batchAsyncContainer; + + @Factory(dataProvider = "clientBuildersWithDirectSession") + public TransactionalBatchAsyncContainerTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT) + public void before_TransactionalBatchAsyncContainerTest() { + assertThat(this.batchClient).isNull(); + this.batchClient = getClientBuilder().buildAsyncClient(); + batchAsyncContainer = getSharedMultiPartitionCosmosContainer(this.batchClient); + } + + @AfterClass(groups = {"simple"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + safeCloseAsync(this.batchClient); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchExecutionRepeat() { + TestDoc firstDoc = this.populateTestDoc(this.partitionKey1); + TestDoc replaceDoc = this.getTestDocCopy(firstDoc); + replaceDoc.setCost(replaceDoc.getCost() + 1); + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(firstDoc); + batch.replaceItemOperation(replaceDoc.getId(), replaceDoc); + + Mono batchResponseMono = batchAsyncContainer.executeTransactionalBatch(batch); + + TransactionalBatchResponse batchResponse1 = batchResponseMono.block(); + this.verifyBatchProcessed(batchResponse1, 2); + + assertThat(batchResponse1.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse1.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + + // Block again. + TransactionalBatchResponse batchResponse2 = batchResponseMono.block(); + assertThat(batchResponse2.getStatusCode()).isEqualTo(HttpResponseStatus.CONFLICT.code()); + assertThat(batchResponse2.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.CONFLICT.code()); + assertThat(batchResponse2.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT * 100) + public void batchInvalidSessionToken() throws Exception { + CosmosAsyncContainer container = batchAsyncContainer; + this.createJsonTestDocs(container); + + CosmosItemResponse readResponse = container.readItem( + this.TestDocPk1ExistingC.getId(), + this.getPartitionKey(this.partitionKey1), + TestDoc.class).block(); + + assertThat(readResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + String invalidSessionToken = this.getDifferentLSNToken(readResponse.getSessionToken(), 2000); + + { + // Batch without Read operation + TestDoc testDocToCreate = this.populateTestDoc(this.partitionKey1); + TestDoc testDocToReplace = this.getTestDocCopy(this.TestDocPk1ExistingA); + testDocToReplace.setCost(testDocToReplace.getCost() + 1); + TestDoc testDocToUpsert = this.populateTestDoc(this.partitionKey1); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(testDocToCreate); + batch.replaceItemOperation(testDocToReplace.getId(), testDocToReplace); + batch.upsertItemOperation(testDocToUpsert); + batch.deleteItemOperation(this.TestDocPk1ExistingC.getId()); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch( + batch, new TransactionalBatchRequestOptions().setSessionToken(invalidSessionToken)).block(); + + this.verifyBatchProcessed(batchResponse, 4); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(2).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(3).getStatusCode()).isEqualTo(HttpResponseStatus.NO_CONTENT.code()); + + List batchOperations = batch.getOperations(); + for (int index = 0; index < batchOperations.size(); index++) { + assertThat(batchResponse.getResults().get(index).getOperation()).isEqualTo(batchOperations.get(index)); + } + } + + { + // Batch with Read operation + TestDoc testDocToCreate = this.populateTestDoc(this.partitionKey1); + TestDoc testDocToReplace = this.getTestDocCopy(this.TestDocPk1ExistingB); + testDocToReplace.setCost(testDocToReplace.getCost() + 1); + TestDoc testDocToUpsert = this.populateTestDoc(this.partitionKey1); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(testDocToCreate); + batch.replaceItemOperation(testDocToReplace.getId(), testDocToReplace); + batch.upsertItemOperation(testDocToUpsert); + batch.deleteItemOperation(this.TestDocPk1ExistingD.getId()); + batch.readItemOperation(this.TestDocPk1ExistingA.getId()); + + try { + container.executeTransactionalBatch( + batch, new TransactionalBatchRequestOptions().setSessionToken(invalidSessionToken)).block(); + + Assertions.fail("Should throw NOT_FOUND/READ_SESSION_NOT_AVAILABLE exception"); + } catch (CosmosException ex) { + assertThat(ex.getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(ex.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE); + } + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/TransactionalBatchTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/TransactionalBatchTest.java new file mode 100644 index 000000000000..8dd694fd552a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/TransactionalBatchTest.java @@ -0,0 +1,540 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ISessionToken; +import com.azure.cosmos.implementation.guava25.base.Function; +import com.azure.cosmos.models.CosmosItemResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.assertj.core.api.Assertions; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.UUID; + +import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstant.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES; +import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstant.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +public class TransactionalBatchTest extends BatchTestBase { + + private CosmosClient batchClient; + private CosmosContainer batchContainer; + + @Factory(dataProvider = "simpleClientBuildersWithDirect") + public TransactionalBatchTest(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @BeforeClass(groups = {"simple"}, timeOut = SETUP_TIMEOUT) + public void before_TransactionalBatchTest() { + assertThat(this.batchClient).isNull(); + this.batchClient = getClientBuilder().buildClient(); + CosmosAsyncContainer batchAsyncContainer = getSharedMultiPartitionCosmosContainer(this.batchClient.asyncClient()); + batchContainer = batchClient.getDatabase(batchAsyncContainer.getDatabase().getId()).getContainer(batchAsyncContainer.getId()); + } + + @AfterClass(groups = {"simple"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + safeCloseSyncClient(this.batchClient); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchOrdered() { + CosmosContainer container = this.batchContainer; + + TestDoc firstDoc = this.populateTestDoc(this.partitionKey1); + TestDoc replaceDoc = this.getTestDocCopy(firstDoc); + replaceDoc.setCost(replaceDoc.getCost() + 1); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(firstDoc); + batch.replaceItemOperation(replaceDoc.getId(), replaceDoc); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + this.verifyBatchProcessed(batchResponse, 2); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + + List batchOperations = batch.getOperations(); + for (int index = 0; index < batchOperations.size(); index++) { + assertThat(batchResponse.getResults().get(index).getOperation()).isEqualTo(batchOperations.get(index)); + } + + // Ensure that the replace overwrote the doc from the first operation + this.verifyByRead(container, replaceDoc); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchMultipleItemExecution() { + CosmosContainer container = this.batchContainer; + + TestDoc firstDoc = this.populateTestDoc(this.partitionKey1); + TestDoc replaceDoc = this.getTestDocCopy(firstDoc); + replaceDoc.setCost(replaceDoc.getCost() + 1); + + EventDoc eventDoc1 = new EventDoc(UUID.randomUUID().toString(), 2, 4, "type1", this.partitionKey1); + EventDoc readEventDoc = new EventDoc(UUID.randomUUID().toString(), 6, 14, "type2", this.partitionKey1); + CosmosItemResponse createResponse = container.createItem(readEventDoc, this.getPartitionKey(this.partitionKey1), null); + assertThat(createResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(firstDoc); + batch.createItemOperation(eventDoc1); + batch.replaceItemOperation(replaceDoc.getId(), replaceDoc); + batch.readItemOperation(readEventDoc.getId()); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + this.verifyBatchProcessed(batchResponse, 4); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(0).getItem(TestDoc.class)).isEqualTo(firstDoc); + + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(1).getItem(EventDoc.class)).isEqualTo(eventDoc1); + + assertThat(batchResponse.getResults().get(2).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(2).getItem(TestDoc.class)).isEqualTo(replaceDoc); + + assertThat(batchResponse.getResults().get(3).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(3).getItem(EventDoc.class)).isEqualTo(readEventDoc); + + // Ensure that the replace overwrote the doc from the first operation + this.verifyByRead(container, replaceDoc); + + List batchOperations = batch.getOperations(); + for (int index = 0; index < batchOperations.size(); index++) { + assertThat(batchResponse.getResults().get(index).getOperation()).isEqualTo(batchOperations.get(index)); + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchItemETagTest() { + CosmosContainer container = batchContainer; + this.createJsonTestDocs(container); + + { + BatchTestBase.TestDoc testDocToCreate = this.populateTestDoc(this.partitionKey1); + + BatchTestBase.TestDoc testDocToReplace = this.getTestDocCopy(this.TestDocPk1ExistingA); + testDocToReplace.setCost(testDocToReplace.getCost() + 1); + + CosmosItemResponse response = container.readItem( + this.TestDocPk1ExistingA.getId(), + this.getPartitionKey(this.partitionKey1), + TestDoc.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + + TransactionalBatchItemRequestOptions firstReplaceOptions = new TransactionalBatchItemRequestOptions(); + firstReplaceOptions.setIfMatchETag(response.getETag()); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(testDocToCreate); + batch.replaceItemOperation(testDocToReplace.getId(), testDocToReplace, firstReplaceOptions); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + this.verifyBatchProcessed(batchResponse, 2); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + + // Ensure that the replace overwrote the doc from the first operation + this.verifyByRead(container, testDocToCreate, batchResponse.getResults().get(0).getETag()); + this.verifyByRead(container, testDocToReplace, batchResponse.getResults().get(1).getETag()); + } + + { + TestDoc testDocToReplace = this.getTestDocCopy(this.TestDocPk1ExistingB); + testDocToReplace.setCost(testDocToReplace.getCost() + 1); + + TransactionalBatchItemRequestOptions replaceOptions = new TransactionalBatchItemRequestOptions(); + replaceOptions.setIfMatchETag(String.valueOf(this.getRandom().nextInt())); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.replaceItemOperation(testDocToReplace.getId(), testDocToReplace, replaceOptions); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + this.verifyBatchProcessed(batchResponse, 1, HttpResponseStatus.PRECONDITION_FAILED); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.PRECONDITION_FAILED.code()); + + // ensure the item was not updated + this.verifyByRead(container, this.TestDocPk1ExistingB); + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchErrorSessionToken() { + CosmosContainer container = batchContainer; + this.createJsonTestDocs(container); + + ISessionToken readResponseNotExistsToken = null; + try { + container.readItem( + UUID.randomUUID().toString(), + this.getPartitionKey(this.partitionKey1), + TestDoc.class); + } catch (CosmosException ex) { + readResponseNotExistsToken = this.getSessionToken(ex.getResponseHeaders().get(HttpConstants.HttpHeaders.SESSION_TOKEN)); + + // When this is changed to return non null, batch needs to be modified too. + String ownerIdRead = ex.getResponseHeaders().get(HttpConstants.HttpHeaders.OWNER_ID); + assertThat(ownerIdRead).isNull(); + } + + { + // Only errored read + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.readItemOperation(UUID.randomUUID().toString()); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + + String ownerIdBatch = batchResponse.getResponseHeaders().get(HttpConstants.HttpHeaders.OWNER_ID); + assertThat(ownerIdBatch).isNull(); + + ISessionToken batchResponseToken = this.getSessionToken(batchResponse.getSessionToken()); + + assertThat(batchResponseToken.getLSN()) + .as("Response session token should be more than or equal to request session token") + .isGreaterThanOrEqualTo(readResponseNotExistsToken.getLSN()); + } + + { + // One valid read one error read + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.readItemOperation(this.TestDocPk1ExistingA.getId()); + batch.readItemOperation(UUID.randomUUID().toString()); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + + String ownerIdBatch = batchResponse.getResponseHeaders().get(HttpConstants.HttpHeaders.OWNER_ID); + assertThat(ownerIdBatch).isNull(); + + ISessionToken batchResponseToken = this.getSessionToken(batchResponse.getSessionToken()); + + assertThat(batchResponseToken.getLSN()) + .as("Response session token should be more than or equal to request session token") + .isGreaterThanOrEqualTo(readResponseNotExistsToken.getLSN()); + } + + { + // One error one valid read + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.readItemOperation(UUID.randomUUID().toString()); + batch.readItemOperation(this.TestDocPk1ExistingA.getId()); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + + String ownerIdBatch = batchResponse.getResponseHeaders().get(HttpConstants.HttpHeaders.OWNER_ID); + assertThat(ownerIdBatch).isNull(); + + ISessionToken batchResponseToken = this.getSessionToken(batchResponse.getSessionToken()); + + assertThat(batchResponseToken.getLSN()) + .as("Response session token should be more than or equal to request session token") + .isGreaterThanOrEqualTo(readResponseNotExistsToken.getLSN()); + } + + { + // One valid write and one error + TestDoc testDocToCreate = this.populateTestDoc(this.partitionKey1); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(testDocToCreate); + batch.readItemOperation(UUID.randomUUID().toString()); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + + String ownerIdBatch = batchResponse.getResponseHeaders().get(HttpConstants.HttpHeaders.OWNER_ID); + assertThat(ownerIdBatch).isNull(); + + ISessionToken batchResponseToken = this.getSessionToken(batchResponse.getSessionToken()); + + assertThat(batchResponseToken.getLSN()) + .as("Response session token should be more than or equal to request session token") + .isGreaterThanOrEqualTo(readResponseNotExistsToken.getLSN()); + } + + { + // One error one valid write + TestDoc testDocToCreate = this.populateTestDoc(this.partitionKey1); + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.readItemOperation(UUID.randomUUID().toString()); + batch.createItemOperation(testDocToCreate); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.NOT_FOUND.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + + String ownerIdBatch = batchResponse.getResponseHeaders().get(HttpConstants.HttpHeaders.OWNER_ID); + assertThat(ownerIdBatch).isNull(); + + ISessionToken batchResponseToken = this.getSessionToken(batchResponse.getSessionToken()); + + assertThat(batchResponseToken.getLSN()) + .as("Response session token should be more than or equal to request session token") + .isGreaterThanOrEqualTo(readResponseNotExistsToken.getLSN()); + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchWithTooManyOperationsTest() { + int operationCount = MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST + 1; + + // Increase the doc size by a bit so all docs won't fit in one server request. + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + + for (int i = 0; i < operationCount; i++) { + batch.readItemOperation("someId"); + } + + try { + batchContainer.executeTransactionalBatch(batch); + Assertions.fail("Should throw bad request exception"); + } catch (CosmosException ex) { + assertThat(ex.getStatusCode()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code()); + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT * 10) + public void batchLargerThanServerRequest() { + int operationCount = 20; + int appxDocSize = (MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES * 11) / operationCount; + + // Increase the doc size by a bit so all docs won't fit in one server request. + appxDocSize = (int)(appxDocSize * 1.05); + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + + for (int i = 0; i < operationCount; i++) { + TestDoc doc = this.populateTestDoc(this.partitionKey1, appxDocSize); + batch.createItemOperation(doc); + } + + try { + batchContainer.executeTransactionalBatch(batch); + Assertions.fail("Should throw REQUEST_ENTITY_TOO_LARGE exception"); + } catch (CosmosException ex) { + assertThat(ex.getStatusCode()).isEqualTo(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE.code()); + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT * 10) + public void batchServerResponseTooLarge() { + int operationCount = 10; + int appxDocSizeInBytes = 1 * 1024 * 1024; + + TestDoc doc = this.createJsonTestDoc(batchContainer, this.partitionKey1, appxDocSizeInBytes); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + for (int i = 0; i < operationCount; i++) { + batch.readItemOperation(doc.getId()); + } + + TransactionalBatchResponse batchResponse = batchContainer.executeTransactionalBatch(batch); + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + + List batchOperations = batch.getOperations(); + for (int index = 0; index < batchOperations.size(); index++) { + assertThat(batchResponse.getResults().get(index).getOperation()).isEqualTo(batchOperations.get(index)); + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchReadsOnlyTest() { + CosmosContainer container = batchContainer; + this.createJsonTestDocs(container); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.readItemOperation(this.TestDocPk1ExistingA.getId()); + batch.readItemOperation(this.TestDocPk1ExistingB.getId()); + batch.readItemOperation(this.TestDocPk1ExistingC.getId()); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + this.verifyBatchProcessed(batchResponse, 3); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(2).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + + assertThat(batchResponse.getResults().get(0).getItem(TestDoc.class)).isEqualTo(this.TestDocPk1ExistingA); + assertThat(batchResponse.getResults().get(1).getItem(TestDoc.class)).isEqualTo(this.TestDocPk1ExistingB); + assertThat(batchResponse.getResults().get(2).getItem(TestDoc.class)).isEqualTo(this.TestDocPk1ExistingC); + + List batchOperations = batch.getOperations(); + for (int index = 0; index < batchOperations.size(); index++) { + assertThat(batchResponse.getResults().get(index).getOperation()).isEqualTo(batchOperations.get(index)); + } + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchCrud() { + CosmosContainer container = batchContainer; + this.createJsonTestDocs(container); + + BatchTestBase.TestDoc testDocToCreate = this.populateTestDoc(this.partitionKey1); + BatchTestBase.TestDoc testDocToUpsert = this.populateTestDoc(this.partitionKey1); + + BatchTestBase.TestDoc anotherTestDocToUpsert = this.getTestDocCopy(this.TestDocPk1ExistingA); + anotherTestDocToUpsert.setCost(anotherTestDocToUpsert.getCost() + 1); + + BatchTestBase.TestDoc testDocToReplace = this.getTestDocCopy(this.TestDocPk1ExistingB); + testDocToReplace.setCost(testDocToReplace.getCost() + 1); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(testDocToCreate); + batch.readItemOperation(this.TestDocPk1ExistingC.getId()); + batch.replaceItemOperation(testDocToReplace.getId(), testDocToReplace); + batch.upsertItemOperation(testDocToUpsert); + batch.upsertItemOperation(anotherTestDocToUpsert); + batch.deleteItemOperation(this.TestDocPk1ExistingD.getId()); + + // We run CRUD operations where all are expected to return HTTP 2xx. + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + this.verifyBatchProcessed(batchResponse, 6); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(2).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(3).getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(batchResponse.getResults().get(4).getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getResults().get(5).getStatusCode()).isEqualTo(HttpResponseStatus.NO_CONTENT.code()); + + List batchOperations = batch.getOperations(); + for (int index = 0; index < batchOperations.size(); index++) { + assertThat(batchResponse.getResults().get(index).getOperation()).isEqualTo(batchOperations.get(index)); + } + + assertThat(batchResponse.getResults().get(1).getItem(TestDoc.class)).isEqualTo(this.TestDocPk1ExistingC); + + this.verifyByRead(container, testDocToCreate); + this.verifyByRead(container, testDocToReplace); + this.verifyByRead(container, testDocToUpsert); + this.verifyByRead(container, anotherTestDocToUpsert); + this.verifyNotFound(container, this.TestDocPk1ExistingD); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchWithInvalidCreateTest() { + // partition key mismatch between doc and and value passed in to the operation + this.runWithError( + batchContainer, + batch -> batch.createItemOperation(this.populateTestDoc(UUID.randomUUID().toString())), + HttpResponseStatus.BAD_REQUEST); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchWithReadOfNonExistentEntityTest() { + this.runWithError( + batchContainer, + batch -> batch.readItemOperation(UUID.randomUUID().toString()), + HttpResponseStatus.NOT_FOUND); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchWithReplaceOfStaleEntity() { + this.createJsonTestDocs(batchContainer); + + TestDoc staleTestDocToReplace = this.getTestDocCopy(this.TestDocPk1ExistingA); + staleTestDocToReplace.setCost(staleTestDocToReplace.getCost() + 1); + + TransactionalBatchItemRequestOptions staleReplaceOptions = new TransactionalBatchItemRequestOptions(); + staleReplaceOptions.setIfMatchETag(UUID.randomUUID().toString()); + + this.runWithError( + batchContainer, + batch -> batch.replaceItemOperation(staleTestDocToReplace.getId(), staleTestDocToReplace, staleReplaceOptions), + HttpResponseStatus.PRECONDITION_FAILED); + + // make sure the stale doc hasn't changed + this.verifyByRead(batchContainer, this.TestDocPk1ExistingA); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchWithDeleteOfNonExistentEntity() { + this.runWithError( + batchContainer, + batch -> batch.deleteItemOperation(UUID.randomUUID().toString()), + HttpResponseStatus.NOT_FOUND); + } + + @Test(groups = {"simple"}, timeOut = TIMEOUT) + public void batchWithCreateConflict() { + this.createJsonTestDocs(batchContainer); + + // try to create a doc with id that already exists (should return a Conflict) + TestDoc conflictingTestDocToCreate = this.getTestDocCopy(this.TestDocPk1ExistingA); + conflictingTestDocToCreate.setCost(conflictingTestDocToCreate.getCost()); + + this.runWithError( + batchContainer, + batch -> batch.createItemOperation(conflictingTestDocToCreate), + HttpResponseStatus.CONFLICT); + + // make sure the conflicted doc hasn't changed + this.verifyByRead(batchContainer, this.TestDocPk1ExistingA); + } + + + private void runWithError( + CosmosContainer container, + Function appendOperation, + HttpResponseStatus expectedFailedOperationStatusCode) { + + TestDoc testDocToCreate = this.populateTestDoc(this.partitionKey1); + TestDoc anotherTestDocToCreate = this.populateTestDoc(this.partitionKey1); + + TransactionalBatch batch = TransactionalBatch.createTransactionalBatch(this.getPartitionKey(this.partitionKey1)); + batch.createItemOperation(testDocToCreate); + + appendOperation.apply(batch); + + batch.createItemOperation(anotherTestDocToCreate); + + TransactionalBatchResponse batchResponse = container.executeTransactionalBatch(batch); + + this.verifyBatchProcessed(batchResponse, 3, expectedFailedOperationStatusCode); + + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + assertThat(batchResponse.getResults().get(1).getStatusCode()).isEqualTo(expectedFailedOperationStatusCode.code()); + assertThat(batchResponse.getResults().get(2).getStatusCode()).isEqualTo(HttpResponseStatus.FAILED_DEPENDENCY.code()); + + List batchOperations = batch.getOperations(); + for (int index = 0; index < batchOperations.size(); index++) { + assertThat(batchResponse.getResults().get(index).getOperation()).isEqualTo(batchOperations.get(index)); + } + + this.verifyNotFound(container, testDocToCreate); + this.verifyNotFound(container, anotherTestDocToCreate); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BatchResponsePayloadWriter.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BatchResponsePayloadWriter.java new file mode 100644 index 000000000000..4f850dc84c88 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/BatchResponsePayloadWriter.java @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.TransactionalBatchOperationResult; +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.implementation.Utils; +import com.fasterxml.jackson.databind.node.ArrayNode; + +import java.util.List; + +class BatchResponsePayloadWriter { + + private List results; + + BatchResponsePayloadWriter(List results) { + this.results = results; + } + + String generatePayload() { + return writeOperationResult().toString(); + } + + private ArrayNode writeOperationResult() { + ArrayNode arrayNode = Utils.getSimpleObjectMapper().createArrayNode(); + + for(TransactionalBatchOperationResult result : results) { + JsonSerializable operationJsonSerializable = writeResult(result); + + arrayNode.add(operationJsonSerializable.getPropertyBag()); + } + return arrayNode; + } + + private JsonSerializable writeResult(TransactionalBatchOperationResult result) { + + JsonSerializable jsonSerializable = new JsonSerializable(); + jsonSerializable.set(BatchRequestResponseConstant.FIELD_STATUS_CODE, result.getStatusCode()); + jsonSerializable.set(BatchRequestResponseConstant.FIELD_SUBSTATUS_CODE, result.getSubStatusCode()); + jsonSerializable.set(BatchRequestResponseConstant.FIELD_ETAG, result.getETag()); + jsonSerializable.set(BatchRequestResponseConstant.FIELD_REQUEST_CHARGE, result.getRequestCharge()); + + if(result.getRetryAfterDuration() != null) { + jsonSerializable.set(BatchRequestResponseConstant.FIELD_RETRY_AFTER_MILLISECONDS, result.getRetryAfterDuration().toMillis()); + } + + return jsonSerializable; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java new file mode 100644 index 000000000000..a72173db90ce --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java @@ -0,0 +1,160 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.batch; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.CosmosItemOperationType; +import com.azure.cosmos.TransactionalBatchOperationResult; +import com.azure.cosmos.TransactionalBatchResponse; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; +import com.azure.cosmos.implementation.directconnectivity.StoreResponse; +import com.azure.cosmos.models.PartitionKey; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.testng.annotations.Test; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TransactionalBatchResponseTests { + + private static final int TIMEOUT = 40000; + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void validateAllSetValuesInResponse() { + List results = new ArrayList<>(); + ItemBatchOperation[] arrayOperations = new ItemBatchOperation[1]; + + ItemBatchOperation operation = new ItemBatchOperation<>( + CosmosItemOperationType.READ, + "0", + PartitionKey.NONE, + null, + null + ); + + arrayOperations[0] = operation; + SinglePartitionKeyServerBatchRequest batchRequest = SinglePartitionKeyServerBatchRequest.createBatchRequest( + PartitionKey.NONE, + Arrays.asList(arrayOperations)); + + // Create dummy result + TransactionalBatchOperationResult transactionalBatchOperationResult = BridgeInternal.createTransactionBatchResult( + operation.getId(), + 5.0, + null, + HttpResponseStatus.NOT_MODIFIED.code(), + Duration.ofMillis(100), + HttpConstants.SubStatusCodes.PARTITION_KEY_MISMATCH, + operation + ); + + results.add(transactionalBatchOperationResult); + String responseContent = new BatchResponsePayloadWriter(results).generatePayload(); + + // TransactionalBatchResponse headers + String activityId = UUID.randomUUID().toString(); + Map headers = new HashMap<>(); + headers.put(HttpConstants.HttpHeaders.ACTIVITY_ID, activityId); + headers.put(HttpConstants.HttpHeaders.REQUEST_CHARGE, "4.5"); + headers.put(HttpConstants.HttpHeaders.SESSION_TOKEN, "token123"); + headers.put(HttpConstants.HttpHeaders.RETRY_AFTER_IN_MILLISECONDS, "1234"); + headers.put(HttpConstants.HttpHeaders.SUB_STATUS, String.valueOf(HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)); + + StoreResponse storeResponse = new StoreResponse( + HttpResponseStatus.OK.code(), + new ArrayList<>(headers.entrySet()), + responseContent.getBytes(StandardCharsets.UTF_8)); + + TransactionalBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( + new RxDocumentServiceResponse(null, storeResponse), + batchRequest, + true); + + // Validate response fields + assertThat(batchResponse.getActivityId()).isEqualTo(activityId); + assertThat(batchResponse.getRequestCharge()).isEqualTo(4.5); + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getSessionToken()).isEqualTo("token123"); + assertThat(batchResponse.getResponseHeaders()).isEqualTo(headers); + assertThat(batchResponse.getRetryAfterDuration()).isEqualTo(Duration.ofMillis(1234)); + assertThat(batchResponse.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE); + + // Validate result fields + assertThat(batchResponse.getResults().get(0).getETag()).isEqualTo(operation.getId()); + assertThat(batchResponse.getResults().get(0).getRequestCharge()).isEqualTo(5.0); + assertThat(batchResponse.getResults().get(0).getRetryAfterDuration()).isEqualTo(Duration.ofMillis(100)); + assertThat(batchResponse.getResults().get(0).getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.PARTITION_KEY_MISMATCH); + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.NOT_MODIFIED.code()); + assertThat(batchResponse.getResults().get(0).getOperation()).isEqualTo(operation); + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void validateEmptyHeaderInResponse() { + List results = new ArrayList<>(); + ItemBatchOperation[] arrayOperations = new ItemBatchOperation[1]; + + ItemBatchOperation operation = new ItemBatchOperation<>( + CosmosItemOperationType.READ, + "0", + PartitionKey.NONE, + null, + null + ); + + arrayOperations[0] = operation; + SinglePartitionKeyServerBatchRequest batchRequest = SinglePartitionKeyServerBatchRequest.createBatchRequest( + PartitionKey.NONE, + Arrays.asList(arrayOperations)); + + // Create dummy result + TransactionalBatchOperationResult transactionalBatchOperationResult = BridgeInternal.createTransactionBatchResult( + null, + 5.0, + null, + HttpResponseStatus.NOT_MODIFIED.code(), + null, + 0, + operation + ); + + results.add(transactionalBatchOperationResult); + String responseContent = new BatchResponsePayloadWriter(results).generatePayload(); + + StoreResponse storeResponse = new StoreResponse( + HttpResponseStatus.OK.code(), + new ArrayList<>(), + responseContent.getBytes(StandardCharsets.UTF_8)); + + TransactionalBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( + new RxDocumentServiceResponse(null, storeResponse), + batchRequest, + true); + + // Validate response fields + assertThat(batchResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(batchResponse.getActivityId()).isNull(); + assertThat(batchResponse.getRequestCharge()).isEqualTo(0); + assertThat(batchResponse.getSessionToken()).isNull(); + assertThat(batchResponse.getResponseHeaders()).isEmpty(); + assertThat(batchResponse.getRetryAfterDuration()).isEqualTo(Duration.ZERO); + assertThat(batchResponse.getSubStatusCode()).isEqualTo(0); + + // Validate result fields + assertThat(batchResponse.getResults().get(0).getETag()).isNull(); + assertThat(batchResponse.getResults().get(0).getRequestCharge()).isEqualTo(5.0); + assertThat(batchResponse.getResults().get(0).getRetryAfterDuration()).isEqualTo(Duration.ZERO); + assertThat(batchResponse.getResults().get(0).getSubStatusCode()).isEqualTo(0); + assertThat(batchResponse.getResults().get(0).getStatusCode()).isEqualTo(HttpResponseStatus.NOT_MODIFIED.code()); + assertThat(batchResponse.getResults().get(0).getOperation()).isEqualTo(operation); + } +}