Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional batch for Java #15775

Merged
merged 31 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1ce5af2
Transactional batch seperate
rakkuma Sep 28, 2020
d886539
Code review changes
rakkuma Sep 30, 2020
752ab93
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Sep 30, 2020
a8103d8
Few more fixes
rakkuma Sep 30, 2020
57868b5
Merging with master
rakkuma Sep 30, 2020
55ac0c5
Fix
rakkuma Sep 30, 2020
136c20c
fix
rakkuma Oct 1, 2020
24633c9
Added issue link
rakkuma Oct 1, 2020
9d0bdee
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Oct 1, 2020
6cd9363
beta version change
rakkuma Oct 1, 2020
d9500e0
Add Beta change to public classes
rakkuma Oct 1, 2020
5afc339
Minor fix
rakkuma Oct 1, 2020
35dcc74
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Oct 1, 2020
47353b5
Code review
rakkuma Oct 5, 2020
8413c9e
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Oct 5, 2020
4396046
Fix
rakkuma Oct 5, 2020
3e8dbc1
Minor fix
rakkuma Oct 5, 2020
ed02b3e
Session token fix and test cases
rakkuma Oct 6, 2020
1d7fb3b
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Oct 6, 2020
6d8ea0d
code review changes
rakkuma Oct 6, 2020
22633c1
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Oct 6, 2020
04ca239
Removing one test case
rakkuma Oct 7, 2020
2735e33
test
rakkuma Oct 7, 2020
3fc3eef
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Oct 7, 2020
d605ba1
Fix
rakkuma Oct 7, 2020
668c0e3
API change
rakkuma Oct 9, 2020
269c13c
Merge branch 'master' into users/rakkuma/transactional-batch
rakkuma Oct 9, 2020
e67348a
Adding CosmosItemoperation interface
rakkuma Oct 9, 2020
17ea44b
Fixing comment
rakkuma Oct 9, 2020
4c4893d
Fix
rakkuma Oct 9, 2020
b707495
Reverting byte buffer change
rakkuma Oct 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.cosmos.implementation.TracerProvider;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.batch.BatchExecutor;
import com.azure.cosmos.implementation.query.QueryInfo;
import com.azure.cosmos.models.CosmosConflictProperties;
import com.azure.cosmos.models.CosmosContainerProperties;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class CosmosAsyncContainer {
private final String queryItemsSpanName;
private final String readAllConflictsSpanName;
private final String queryConflictsSpanName;
private final String batchSpanName;
private CosmosAsyncScripts scripts;

CosmosAsyncContainer(String id, CosmosAsyncDatabase database) {
Expand All @@ -87,6 +89,7 @@ public class CosmosAsyncContainer {
this.queryItemsSpanName = "queryItems." + this.id;
this.readAllConflictsSpanName = "readAllConflicts." + this.id;
this.queryConflictsSpanName = "queryConflicts." + this.id;
this.batchSpanName = "transactionalBatch." + this.id;
}

/**
Expand Down Expand Up @@ -493,6 +496,88 @@ private <T> T transform(Object object, Class<T> classType) {
return Utils.getSimpleObjectMapper().convertValue(object, classType);
}

/**
* Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
*
* @return A Mono response which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getResponseStatus} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getResponseStatus}. To obtain information about the operations that failed, the
* response can be enumerated. This returns {@link TransactionalBatchOperationResult} instances corresponding to
* each operation in the transactional batch in the order they were added to the transactional batch. For a result
* corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getResponseStatus}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* The value returned by {@link TransactionalBatchResponse#getResponseStatus} on the response returned may also have
* values such as 500 in case of server errors and 429.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_4_0)
public Mono<TransactionalBatchResponse> executeTransactionalBatch(TransactionalBatch transactionalBatch) {
return executeTransactionalBatch(transactionalBatch, new TransactionalBatchRequestOptions());
}

/**
* Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
* @param requestOptions Options that apply specifically to batch request.
*
* @return A Mono response which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getResponseStatus} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getResponseStatus}. To obtain information about the operations that failed, the
* response can be enumerated. This returns {@link TransactionalBatchOperationResult} instances corresponding to
* each operation in the transactional batch in the order they were added to the transactional batch. For a result
* corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getResponseStatus}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* The value returned by {@link TransactionalBatchResponse#getResponseStatus} on the response returned may also have
* values such as 500 in case of server errors and 429.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_4_0)
public Mono<TransactionalBatchResponse> executeTransactionalBatch(
TransactionalBatch transactionalBatch,
TransactionalBatchRequestOptions requestOptions) {

if (requestOptions == null) {
requestOptions = new TransactionalBatchRequestOptions();
}

final BatchExecutor executor = new BatchExecutor(this, transactionalBatch, requestOptions);
final Mono<TransactionalBatchResponse> responseMono = executor.executeAsync();

return withContext(context -> database.getClient().getTracerProvider().
traceEnabledBatchResponsePublisher(responseMono,
context,
this.batchSpanName,
database.getId(),
database.getClient().getServiceEndpoint()));
}

/**
* Reads an item.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,19 @@ private CosmosItemResponse<Object> blockDeleteItemResponse(Mono<CosmosItemRespon
}
}

private TransactionalBatchResponse blockBatchResponse(Mono<TransactionalBatchResponse> batchResponseMono) {
try {
return batchResponseMono.block();
} catch (Exception ex) {
final Throwable throwable = Exceptions.unwrap(ex);
if (throwable instanceof CosmosException) {
throw (CosmosException) throwable;
} else {
throw ex;
}
}
}

/**
* Read all items as {@link CosmosPagedIterable} in the current container.
*
Expand Down Expand Up @@ -453,6 +466,76 @@ public <T> CosmosItemResponse<Object> deleteItem(T item, CosmosItemRequestOption
return this.blockDeleteItemResponse(asyncContainer.deleteItem(item, options));
}

/**
* Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
*
* @return A Mono response which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getResponseStatus} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getResponseStatus}. To obtain information about the operations that failed, the
* response can be enumerated. This returns {@link TransactionalBatchOperationResult} instances corresponding to
* each operation in the transactional batch in the order they were added to the transactional batch. For a result
* corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getResponseStatus}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* The value returned by {@link TransactionalBatchResponse#getResponseStatus} on the response returned may also have
* values such as 500 in case of server errors and 429.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_4_0)
public TransactionalBatchResponse executeTransactionalBatch(TransactionalBatch transactionalBatch) {
return this.blockBatchResponse(asyncContainer.executeTransactionalBatch(transactionalBatch));
}

/**
* Executes the transactional batch at the Azure Cosmos service as an asynchronous operation.
*
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
* @param requestOptions Options that apply specifically to batch request.
*
* @return A Mono response which contains details of execution of the transactional batch.
* <p>
* If the transactional batch executes successfully, the value returned by {@link
* TransactionalBatchResponse#getResponseStatus} on the response returned will be set to 200}.
* <p>
* If an operation within the transactional batch fails during execution, no changes from the batch will be
* committed and the status of the failing operation is made available by {@link
* TransactionalBatchResponse#getResponseStatus}. To obtain information about the operations that failed, the
* response can be enumerated. This returns {@link TransactionalBatchOperationResult} instances corresponding to
* each operation in the transactional batch in the order they were added to the transactional batch. For a result
* corresponding to an operation within the transactional batch, use
* {@link TransactionalBatchOperationResult#getResponseStatus}
* to access the status of the operation. If the operation was not executed or it was aborted due to the failure of
* another operation within the transactional batch, the value of this field will be 424;
* for the operation that caused the batch to abort, the value of this field
* will indicate the cause of failure.
* <p>
* The value returned by {@link TransactionalBatchResponse#getResponseStatus} on the response returned may also have
* values such as 500 in case of server errors and 429.
* <p>
* Use {@link TransactionalBatchResponse#isSuccessStatusCode} on the response returned to ensure that the
* transactional batch succeeded.
*/
@Beta(Beta.SinceVersion.V4_4_0)
public TransactionalBatchResponse executeTransactionalBatch(
TransactionalBatch transactionalBatch,
TransactionalBatchRequestOptions requestOptions) {

return this.blockBatchResponse(asyncContainer.executeTransactionalBatch(transactionalBatch, requestOptions));
}

/**
* Gets the Cosmos scripts using the current container as context.
*
Expand Down
Loading