Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More Robust BlobBatch Implementation #6882

Merged
merged 5 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
import com.azure.core.http.HttpPipelineBuilder;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.http.rest.Response;
import com.azure.core.util.UrlBuilder;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobClientBuilder;
Expand All @@ -22,24 +20,16 @@
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.common.Utility;
import com.azure.storage.common.policy.StorageSharedKeyCredentialPolicy;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.net.MalformedURLException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
* This class allows for batching of multiple Azure Storage operations in a single request via {@link
Expand All @@ -55,17 +45,9 @@
*/
public final class BlobBatch {
private static final String X_MS_VERSION = "x-ms-version";
private static final String BATCH_REQUEST_CONTENT_ID = "Batch-Request-Content-Id";
private static final String BATCH_REQUEST_URL_PATH = "Batch-Request-Url-Path";
private static final String CONTENT_ID = "Content-Id";
private static final String BATCH_BOUNDARY_TEMPLATE = "batch_%s";
private static final String REQUEST_CONTENT_TYPE_TEMPLATE = "multipart/mixed; boundary=%s";
private static final String BATCH_OPERATION_CONTENT_TYPE = "Content-Type: application/http";
private static final String BATCH_OPERATION_CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding: binary";
private static final String BATCH_OPERATION_CONTENT_ID_TEMPLATE = "Content-ID: %d";
private static final String HTTP_VERSION = "HTTP/1.1";
private static final String OPERATION_TEMPLATE = "%s %s %s";
private static final String HEADER_TEMPLATE = "%s: %s";
private static final String BATCH_OPERATION_RESPONSE = "Batch-Operation-Response";
private static final String BATCH_OPERATION_INFO = "Batch-Operation-Info";
private static final String PATH_TEMPLATE = "%s/%s";

/*
Expand All @@ -79,23 +61,12 @@ public final class BlobBatch {

private final BlobAsyncClient blobAsyncClient;

private final Deque<Mono<? extends Response<?>>> batchOperationQueue;
private final List<ByteBuffer> batchRequest;
private final Map<Integer, BlobBatchOperationResponse<?>> batchMapping;

private final AtomicInteger contentId;
private final String batchBoundary;
private final String contentType;

private Deque<BlobBatchOperation<?>> batchOperationQueue;
private BlobBatchType batchType;

BlobBatch(String accountUrl, HttpPipeline pipeline) {
this.contentId = new AtomicInteger();
this.batchBoundary = String.format(BATCH_BOUNDARY_TEMPLATE, UUID.randomUUID());
this.contentType = String.format(REQUEST_CONTENT_TYPE_TEMPLATE, batchBoundary);

boolean batchHeadersPolicySet = false;
HttpPipelineBuilder batchPipelineBuilder = new HttpPipelineBuilder().httpClient(this::setupBatchOperation);
HttpPipelineBuilder batchPipelineBuilder = new HttpPipelineBuilder();
for (int i = 0; i < pipeline.getPolicyCount(); i++) {
HttpPipelinePolicy policy = pipeline.getPolicy(i);

Expand All @@ -112,15 +83,15 @@ public final class BlobBatch {
batchPipelineBuilder.policies(this::cleanseHeaders, this::setRequestUrl);
}

batchPipelineBuilder.policies(this::buildBatchOperation);

this.blobAsyncClient = new BlobClientBuilder()
.endpoint(accountUrl)
.blobName("")
.pipeline(batchPipelineBuilder.build())
.buildAsyncClient();

this.batchOperationQueue = new ConcurrentLinkedDeque<>();
this.batchRequest = new ArrayList<>();
this.batchMapping = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -287,12 +258,9 @@ private Response<Void> setBlobAccessTierHelper(String urlPath, AccessTier access

private <T> Response<T> createBatchOperation(Mono<Response<T>> response, String urlPath,
int... expectedStatusCodes) {
int id = contentId.getAndIncrement();
batchOperationQueue.add(response
.subscriberContext(Context.of(BATCH_REQUEST_CONTENT_ID, id, BATCH_REQUEST_URL_PATH, urlPath)));

BlobBatchOperationResponse<T> batchOperationResponse = new BlobBatchOperationResponse<>(expectedStatusCodes);
batchMapping.put(id, batchOperationResponse);
batchOperationQueue.add(new BlobBatchOperation<>(batchOperationResponse, response, urlPath));

return batchOperationResponse;
}

Expand All @@ -309,49 +277,30 @@ private void setBatchType(BlobBatchType batchType) {
}
}

Flux<ByteBuffer> getBody() {
Mono<BlobBatchOperationInfo> prepareBlobBatchSubmission() {
if (batchOperationQueue.isEmpty()) {
throw logger.logExceptionAsError(new UnsupportedOperationException("Empty batch requests aren't allowed."));
}

// 'flatMap' the requests to trigger them to run through the pipeline.
Disposable disposable = Flux.fromStream(batchOperationQueue.stream())
.flatMap(batchOperation -> batchOperation)
.subscribe();

/* Wait until the 'Flux' is disposed of (aka complete) instead of blocking as this will prevent Reactor from
* throwing an exception if this was ran in a Reactor thread.
*/
while (!disposable.isDisposed()) {
// This is used as opposed to block as it won't trigger an exception if ran in a Reactor thread.
}

this.batchRequest.add(ByteBuffer.wrap(
String.format("--%s--%s", batchBoundary, BlobBatchHelper.HTTP_NEWLINE).getBytes(StandardCharsets.UTF_8)));

return Flux.fromIterable(batchRequest);
}

long getContentLength() {
long contentLength = 0;

for (ByteBuffer request : batchRequest) {
contentLength += request.remaining();
return Mono.error(new UnsupportedOperationException("Empty batch requests aren't allowed."));
}

return contentLength;
}

String getContentType() {
return contentType;
}

BlobBatchOperationResponse<?> getBatchRequest(int contentId) {
return batchMapping.get(contentId);
}

int getOperationCount() {
return batchMapping.size();
BlobBatchOperationInfo operationInfo = new BlobBatchOperationInfo();
Deque<BlobBatchOperation<?>> operations = batchOperationQueue;

// Begin a new batch.
batchOperationQueue = new ConcurrentLinkedDeque<>();

return Flux.generate(sink -> {
if (operations.isEmpty()) {
operationInfo.finalizeBatchOperations();
sink.complete();
} else {
BlobBatchOperation<?> batchOperation = operations.pop();
sink.next(batchOperation.getResponse()
.subscriberContext(Context.of(BATCH_REQUEST_URL_PATH, batchOperation.getRequestUrlPath(),
BATCH_OPERATION_RESPONSE, batchOperation.getBatchOperationResponse(),
BATCH_OPERATION_INFO, operationInfo))
.subscribe());
}
}).then(Mono.just(operationInfo));
}

/*
Expand All @@ -369,9 +318,6 @@ private Mono<HttpResponse> cleanseHeaders(HttpPipelineCallContext context, HttpP

context.getHttpRequest().setHeaders(new HttpHeaders(headers));

// Add the "Content-Id" header which allows this request to be mapped to the response.
context.getHttpRequest().setHeader(CONTENT_ID, context.getData(BATCH_REQUEST_CONTENT_ID).get().toString());

return next.process();
}

Expand All @@ -396,38 +342,12 @@ private Mono<HttpResponse> setRequestUrl(HttpPipelineCallContext context, HttpPi
* This will "send" the batch operation request when triggered, it simply acts as a way to build and write the
* batch operation into the overall request and then returns nothing as the response.
*/
private Mono<HttpResponse> setupBatchOperation(HttpRequest request) {
return Mono.fromRunnable(() -> {
int contentId = Integer.parseInt(request.getHeaders().remove(CONTENT_ID).getValue());

StringBuilder batchRequestBuilder = new StringBuilder();
appendWithNewline(batchRequestBuilder, "--" + batchBoundary);
appendWithNewline(batchRequestBuilder, BATCH_OPERATION_CONTENT_TYPE);
appendWithNewline(batchRequestBuilder, BATCH_OPERATION_CONTENT_TRANSFER_ENCODING);
appendWithNewline(batchRequestBuilder, String.format(BATCH_OPERATION_CONTENT_ID_TEMPLATE, contentId));
batchRequestBuilder.append(BlobBatchHelper.HTTP_NEWLINE);

String method = request.getHttpMethod().toString();
String urlPath = request.getUrl().getPath();
String urlQuery = request.getUrl().getQuery();
if (!CoreUtils.isNullOrEmpty(urlQuery)) {
urlPath = urlPath + "?" + urlQuery;
}
appendWithNewline(batchRequestBuilder, String.format(OPERATION_TEMPLATE, method, urlPath, HTTP_VERSION));

request.getHeaders().stream()
.filter(header -> !X_MS_VERSION.equalsIgnoreCase(header.getName()))
.forEach(header -> appendWithNewline(batchRequestBuilder,
String.format(HEADER_TEMPLATE, header.getName(), header.getValue())));

batchRequestBuilder.append(BlobBatchHelper.HTTP_NEWLINE);

batchRequest.add(ByteBuffer.wrap(batchRequestBuilder.toString().getBytes(StandardCharsets.UTF_8)));
batchMapping.get(contentId).setRequest(request);
});
}
private Mono<HttpResponse> buildBatchOperation(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
BlobBatchOperationInfo operationInfo = (BlobBatchOperationInfo) context.getData(BATCH_OPERATION_INFO).get();
BlobBatchOperationResponse<?> batchOperationResponse =
(BlobBatchOperationResponse<?>) context.getData(BATCH_OPERATION_RESPONSE).get();
operationInfo.addBatchOperation(batchOperationResponse, context.getHttpRequest());

private void appendWithNewline(StringBuilder stringBuilder, String value) {
stringBuilder.append(value).append(BlobBatchHelper.HTTP_NEWLINE);
return Mono.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceVersion;
import com.azure.storage.blob.implementation.AzureBlobStorageBuilder;
Expand All @@ -22,6 +22,7 @@
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.common.implementation.StorageImplUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
Expand Down Expand Up @@ -103,8 +104,8 @@ public Mono<Void> submitBatch(BlobBatch batch) {
* @return A response only containing header and status code information, used to indicate that the batch operation
* has completed.
* @throws BlobStorageException If the batch request is malformed.
* @throws BlobBatchStorageException If {@code throwOnAnyFailure} is {@code true} and any request in the
* {@link BlobBatch} failed.
* @throws BlobBatchStorageException If {@code throwOnAnyFailure} is {@code true} and any request in the {@link
* BlobBatch} failed.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> submitBatchWithResponse(BlobBatch batch, boolean throwOnAnyFailure) {
Expand All @@ -116,9 +117,12 @@ public Mono<Response<Void>> submitBatchWithResponse(BlobBatch batch, boolean thr
}

Mono<Response<Void>> submitBatchWithResponse(BlobBatch batch, boolean throwOnAnyFailure, Context context) {
return client.services().submitBatchWithRestResponseAsync(
batch.getBody(), batch.getContentLength(), batch.getContentType(), context)
.flatMap(response -> BlobBatchHelper.mapBatchResponse(batch, response, throwOnAnyFailure, logger));
return batch.prepareBlobBatchSubmission()
.flatMap(batchOperationInfo -> client.services()
.submitBatchWithRestResponseAsync(Flux.fromIterable(batchOperationInfo.getBody()),
batchOperationInfo.getContentLength(), batchOperationInfo.getContentType(), context)
.flatMap(response ->
BlobBatchHelper.mapBatchResponse(batchOperationInfo, response, throwOnAnyFailure, logger)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class BlobBatchHelper {
.compile("application\\/http", Pattern.CASE_INSENSITIVE);

// This method connects the batch response values to the individual batch operations based on their Content-Id
static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatch batch, ServicesSubmitBatchResponse batchResponse,
boolean throwOnAnyFailure, ClientLogger logger) {
static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatchOperationInfo batchOperationInfo,
ServicesSubmitBatchResponse batchResponse, boolean throwOnAnyFailure, ClientLogger logger) {
/*
* Content-Type will contain the boundary for each batch response. The expected format is:
* "Content-Type: multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed"
Expand All @@ -74,7 +74,7 @@ static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatch batch, ServicesSubm
List<BlobStorageException> exceptions = new ArrayList<>();

String[] subResponses = body.split("--" + boundary);
if (subResponses.length == 3 && batch.getOperationCount() != 1) {
if (subResponses.length == 3 && batchOperationInfo.getOperationCount() != 1) {
String[] exceptionSections = subResponses[1].split(HTTP_NEWLINE + HTTP_NEWLINE);
int statusCode = getStatusCode(exceptionSections[1], logger);
HttpHeaders headers = getHttpHeaders(exceptionSections[1]);
Expand All @@ -95,7 +95,7 @@ static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatch batch, ServicesSubm

// The first section will contain batching metadata.
BlobBatchOperationResponse<?> batchOperationResponse =
getBatchOperation(batch, subResponseSections[0], logger);
getBatchOperation(batchOperationInfo, subResponseSections[0], logger);

// The second section will contain status code and header information.
batchOperationResponse.setStatusCode(getStatusCode(subResponseSections[1], logger));
Expand All @@ -117,8 +117,8 @@ static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatch batch, ServicesSubm
}));
}

private static BlobBatchOperationResponse<?> getBatchOperation(BlobBatch batch, String responseBatchInfo,
ClientLogger logger) {
private static BlobBatchOperationResponse<?> getBatchOperation(BlobBatchOperationInfo batchOperationInfo,
String responseBatchInfo, ClientLogger logger) {
Matcher contentIdMatcher = CONTENT_ID_PATTERN.matcher(responseBatchInfo);

int contentId;
Expand All @@ -129,7 +129,7 @@ private static BlobBatchOperationResponse<?> getBatchOperation(BlobBatch batch,
new IllegalStateException("Batch operation response doesn't contain a 'Content-Id' header."));
}

return batch.getBatchRequest(contentId).setResponseReceived();
return batchOperationInfo.getBatchRequest(contentId).setResponseReceived();
}

private static int getStatusCode(String responseMetadata, ClientLogger logger) {
Expand Down
Loading