Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Blob Test Reliability #6566

Merged
merged 28 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
897eb39
Updating tests to enhance reliability
alzimmermsft Nov 26, 2019
1a137c1
Merge branch 'master' into AzStorage_EnhanceTestReliability
alzimmermsft Nov 26, 2019
ca90f15
Add hook to prevent additional logging
alzimmermsft Nov 26, 2019
752d21b
Merge branch 'master' into AzStorage_EnhanceTestReliability
alzimmermsft Nov 27, 2019
1f73dab
Print statement to help with reliability debugging
alzimmermsft Nov 27, 2019
984a5f2
Remove some blocking calls
alzimmermsft Nov 27, 2019
668e717
Removing more blocking calls in favor of StepVerifier
alzimmermsft Nov 27, 2019
e23ea12
Updating playback records
alzimmermsft Nov 27, 2019
081f419
Merge branch 'master' into AzStorage_EnhanceTestReliability
alzimmermsft Nov 27, 2019
a091a56
Revert on error resume and fixed linting issue
alzimmermsft Nov 27, 2019
fed3c6a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
sima-zhu Dec 5, 2019
064bd0f
Added retry limits and concurrency limits
sima-zhu Dec 5, 2019
af55be6
change log level back to error
sima-zhu Dec 5, 2019
e525d06
change log level back to error
sima-zhu Dec 6, 2019
d5c4c88
Remove extra retry
sima-zhu Dec 6, 2019
e907330
Make small changes to detect the issue
sima-zhu Dec 6, 2019
dfcbf8a
Added some message
sima-zhu Dec 6, 2019
2f981ab
Print the system value
sima-zhu Dec 6, 2019
7c98879
Added sys value into commandline
sima-zhu Dec 6, 2019
b283463
literally setup
sima-zhu Dec 6, 2019
b9a323b
Remove invalid commands
sima-zhu Dec 6, 2019
455c694
Added retry back
sima-zhu Dec 6, 2019
ace585a
Remove the one failed at local
sima-zhu Dec 6, 2019
49b3d42
Remove debug
sima-zhu Dec 6, 2019
ad9e6b6
Remove logging info
sima-zhu Dec 7, 2019
426e70c
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
sima-zhu Dec 10, 2019
7619f5e
Make some changes to settings and pull mainline
sima-zhu Dec 10, 2019
1d5609d
Revert back the comment tests
sima-zhu Dec 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eng/pipelines/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ resources:
endpoint: azure

variables:
DefaultOptions: '--batch-mode -Dmaven.wagon.http.pool=false --settings eng/settings.xml'
DefaultOptions: '--batch-mode --fail-at-end -Dmaven.wagon.http.pool=false --settings eng/settings.xml'
LoggingOptions: '-Dorg.slf4j.simpleLogger.defaultLogLevel=error -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn'

jobs:
Expand Down
2 changes: 1 addition & 1 deletion eng/pipelines/templates/jobs/archetype-sdk-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ parameters:
TestName: LiveTest
TimeoutInMinutes: 60
TestStepMavenInputs:
options: '--batch-mode --fail-at-end -Dmaven.wagon.http.pool=false -Dsurefire.rerunFailingTestsCount=3 --settings eng/settings.xml'
options: '--batch-mode -Dmaven.wagon.http.pool=false -Dsurefire.rerunFailingTestsCount=3 --settings eng/settings.xml'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want to have this using fail-at-end given that we have many multi module services and this would result in latter modules never reporting tests until earlier ones pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will revert it back.

mavenOptions: '-Xmx3072m -Dorg.slf4j.simpleLogger.defaultLogLevel=error -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn'
javaHomeOption: 'JDKVersion'
jdkVersionOption: '1.8'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.fluxError;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.withContext;
import static java.lang.StrictMath.toIntExact;

Expand Down Expand Up @@ -322,19 +322,19 @@ public PollerFlux<BlobCopyInfo, Void> beginCopy(String sourceUrl, Duration pollI
* @param metadata Metadata to associate with the destination blob.
* @param tier {@link AccessTier} for the destination blob.
* @param priority {@link RehydratePriority} for rehydrating the blob.
* @param sourceModifiedRequestConditions {@link RequestConditions} against the source. Standard HTTP
* Access conditions related to the modification of data. ETag and LastModifiedTime are used to construct
* conditions related to when the blob was changed relative to the given request. The request will fail if the
* specified condition is not satisfied.
* @param sourceModifiedRequestConditions {@link RequestConditions} against the source. Standard HTTP Access
* conditions related to the modification of data. ETag and LastModifiedTime are used to construct conditions
* related to when the blob was changed relative to the given request. The request will fail if the specified
* condition is not satisfied.
* @param destRequestConditions {@link BlobRequestConditions} against the destination.
* @param pollInterval Duration between each poll for the copy status. If none is specified, a default of one second
* is used.
* @return A {@link PollerFlux} that polls the blob copy operation until it has completed, has failed, or has been
* cancelled.
*/
public PollerFlux<BlobCopyInfo, Void> beginCopy(String sourceUrl, Map<String, String> metadata, AccessTier tier,
RehydratePriority priority, RequestConditions sourceModifiedRequestConditions,
BlobRequestConditions destRequestConditions, Duration pollInterval) {
RehydratePriority priority, RequestConditions sourceModifiedRequestConditions,
BlobRequestConditions destRequestConditions, Duration pollInterval) {

final Duration interval = pollInterval != null ? pollInterval : Duration.ofSeconds(1);
final RequestConditions sourceModifiedCondition = sourceModifiedRequestConditions == null
Expand Down Expand Up @@ -367,9 +367,9 @@ public PollerFlux<BlobCopyInfo, Void> beginCopy(String sourceUrl, Map<String, St
}
},
(pollingContext, firstResponse) -> {
if (firstResponse == null || firstResponse.getValue() == null) {
if (firstResponse == null || firstResponse.getValue() == null) {
return Mono.error(logger.logExceptionAsError(
new IllegalArgumentException("Cannot cancel a poll response that never started.")));
new IllegalArgumentException("Cannot cancel a poll response that never started.")));
}
final String copyIdentifier = firstResponse.getValue().getCopyId();

Expand All @@ -385,8 +385,8 @@ public PollerFlux<BlobCopyInfo, Void> beginCopy(String sourceUrl, Map<String, St
}

private Mono<BlobCopyInfo> onStart(String sourceUrl, Map<String, String> metadata, AccessTier tier,
RehydratePriority priority, RequestConditions sourceModifiedRequestConditions,
BlobRequestConditions destinationRequestConditions) {
RehydratePriority priority, RequestConditions sourceModifiedRequestConditions,
BlobRequestConditions destinationRequestConditions) {
URL url;
try {
url = new URL(sourceUrl);
Expand Down Expand Up @@ -420,7 +420,7 @@ private Mono<PollResponse<BlobCopyInfo>> onPoll(PollResponse<BlobCopyInfo> pollR
if (lastInfo == null) {
logger.warning("BlobCopyInfo does not exist. Activation operation failed.");
return Mono.just(new PollResponse<>(
LongRunningOperationStatus.fromString("COPY_START_FAILED", true), null));
LongRunningOperationStatus.fromString("COPY_START_FAILED", true), null));
}

return getProperties().map(response -> {
Expand Down Expand Up @@ -462,11 +462,11 @@ private Mono<PollResponse<BlobCopyInfo>> onPoll(PollResponse<BlobCopyInfo> pollR
* <p>For more information, see the
* <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/abort-copy-blob">Azure Docs</a></p>
*
* @param copyId The id of the copy operation to abort.
* @return A reactive response signalling completion.
* @see #copyFromUrl(String)
* @see #beginCopy(String, Duration)
* @see #beginCopy(String, Map, AccessTier, RehydratePriority, RequestConditions, BlobRequestConditions, Duration)
* @param copyId The id of the copy operation to abort.
* @return A reactive response signalling completion.
*/
public Mono<Void> abortCopyFromUrl(String copyId) {
try {
Expand All @@ -486,12 +486,12 @@ public Mono<Void> abortCopyFromUrl(String copyId) {
* <p>For more information, see the
* <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/abort-copy-blob">Azure Docs</a></p>
*
* @see #copyFromUrl(String)
* @see #beginCopy(String, Duration)
* @see #beginCopy(String, Map, AccessTier, RehydratePriority, RequestConditions, BlobRequestConditions, Duration)
* @param copyId The id of the copy operation to abort.
* @param leaseId The lease ID the active lease on the blob must match.
* @return A reactive response signalling completion.
* @see #copyFromUrl(String)
* @see #beginCopy(String, Duration)
* @see #beginCopy(String, Map, AccessTier, RehydratePriority, RequestConditions, BlobRequestConditions, Duration)
*/
public Mono<Response<Void>> abortCopyFromUrlWithResponse(String copyId, String leaseId) {
try {
Expand Down Expand Up @@ -549,8 +549,8 @@ public Mono<String> copyFromUrl(String copySource) {
* @return A reactive response containing the copy ID for the long running operation.
*/
public Mono<Response<String>> copyFromUrlWithResponse(String copySource, Map<String, String> metadata,
AccessTier tier, RequestConditions sourceModifiedRequestConditions,
BlobRequestConditions destRequestConditions) {
AccessTier tier, RequestConditions sourceModifiedRequestConditions,
BlobRequestConditions destRequestConditions) {
try {
return withContext(context -> copyFromUrlWithResponse(copySource, metadata, tier,
sourceModifiedRequestConditions, destRequestConditions, context));
Expand All @@ -560,8 +560,8 @@ public Mono<Response<String>> copyFromUrlWithResponse(String copySource, Map<Str
}

Mono<Response<String>> copyFromUrlWithResponse(String copySource, Map<String, String> metadata, AccessTier tier,
RequestConditions sourceModifiedRequestConditions, BlobRequestConditions destRequestConditions,
Context context) {
RequestConditions sourceModifiedRequestConditions, BlobRequestConditions destRequestConditions,
Context context) {
sourceModifiedRequestConditions = sourceModifiedRequestConditions == null
? new RequestConditions() : sourceModifiedRequestConditions;
destRequestConditions = destRequestConditions == null ? new BlobRequestConditions() : destRequestConditions;
Expand Down Expand Up @@ -764,44 +764,42 @@ private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChanne
* Downloads the first chunk and gets the size of the data and etag if not specified by the user.
*/
return getSetupMono(finalRange, finalParallelTransferOptions, downloadRetryOptions, requestConditions,
rangeGetContentMd5, context)
.flatMap(setupTuple3 -> {
long newCount = setupTuple3.getT1();
BlobRequestConditions finalConditions = setupTuple3.getT2();

int numChunks = calculateNumBlocks(newCount, finalParallelTransferOptions.getBlockSize());

// In case it is an empty blob, this ensures we still actually perform a download operation.
numChunks = numChunks == 0 ? 1 : numChunks;

BlobDownloadAsyncResponse initialResponse = setupTuple3.getT3();
return Flux.range(0, numChunks)
.flatMap(chunkNum -> {
// The first chunk was retrieved during setup.
if (chunkNum == 0) {
return writeBodyToFile(initialResponse, file, 0, finalParallelTransferOptions, progressLock,
totalProgress);
}

// Calculate whether we need a full chunk or something smaller because we are at the end.
long chunkSizeActual = Math.min(finalParallelTransferOptions.getBlockSize(),
newCount - (chunkNum.longValue() * finalParallelTransferOptions.getBlockSize().longValue()));
BlobRange chunkRange = new BlobRange(
finalRange.getOffset()
+ (chunkNum.longValue() * finalParallelTransferOptions.getBlockSize().longValue()),
chunkSizeActual);

// Make the download call.
return this.downloadWithResponse(chunkRange, downloadRetryOptions, finalConditions,
rangeGetContentMd5, null)
.subscribeOn(Schedulers.elastic())
.flatMap(response ->
writeBodyToFile(response, file, chunkNum, finalParallelTransferOptions, progressLock,
totalProgress));
})
// Only the first download call returns a value.
.then(Mono.just(buildBlobPropertiesResponse(initialResponse)));
});
rangeGetContentMd5, context)
.flatMap(setupTuple3 -> {
long newCount = setupTuple3.getT1();
BlobRequestConditions finalConditions = setupTuple3.getT2();

int numChunks = calculateNumBlocks(newCount, finalParallelTransferOptions.getBlockSize());

// In case it is an empty blob, this ensures we still actually perform a download operation.
numChunks = numChunks == 0 ? 1 : numChunks;

BlobDownloadAsyncResponse initialResponse = setupTuple3.getT3();
return Flux.range(0, numChunks)
.flatMap(chunkNum -> {
// The first chunk was retrieved during setup.
if (chunkNum == 0) {
return writeBodyToFile(initialResponse, file, 0, finalParallelTransferOptions, progressLock,
totalProgress);
}

// Calculate whether we need a full chunk or something smaller because we are at the end.
long modifier = chunkNum.longValue() * finalParallelTransferOptions.getBlockSize();
long chunkSizeActual = Math.min(finalParallelTransferOptions.getBlockSize(),
newCount - modifier);
BlobRange chunkRange = new BlobRange(finalRange.getOffset() + modifier, chunkSizeActual);

// Make the download call.
return this.downloadWithResponse(chunkRange, downloadRetryOptions, finalConditions,
rangeGetContentMd5, null)
.subscribeOn(Schedulers.elastic())
.flatMap(response ->
writeBodyToFile(response, file, chunkNum, finalParallelTransferOptions, progressLock,
totalProgress));
})
// Only the first download call returns a value.
.then(Mono.just(buildBlobPropertiesResponse(initialResponse)));
});
}

private int calculateNumBlocks(long dataSize, long blockLength) {
Expand Down Expand Up @@ -852,10 +850,10 @@ private Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse>> get
})
.onErrorResume(BlobStorageException.class, blobStorageException -> {
/*
In the case of an empty blob, we still want to report success and give back valid headers. Attempting a
range download on an empty blob will return an InvalidRange error code and a Content-Range header of the
format "bytes * /0". We need to double check that the total size is zero in the case that the customer
has attempted an invalid range on a non-zero length blob.
* In the case of an empty blob, we still want to report success and give back valid headers.
* Attempting a range download on an empty blob will return an InvalidRange error code and a
* Content-Range header of the format "bytes * /0". We need to double check that the total size is zero
* in the case that the customer has attempted an invalid range on a non-zero length blob.
*/
if (blobStorageException.getErrorCode() == BlobErrorCode.INVALID_RANGE
&& extractTotalBlobLength(blobStorageException.getResponse()
Expand All @@ -876,6 +874,7 @@ && extractTotalBlobLength(blobStorageException.getResponse()
return Mono.zip(Mono.just(0L), Mono.just(requestConditions), Mono.just(response));
});
}

return Mono.error(blobStorageException);
});
}
Expand Down Expand Up @@ -940,7 +939,7 @@ private void downloadToFileCleanup(AsynchronousFileChannel channel, String fileP
try {
channel.close();
if (!signalType.equals(SignalType.ON_COMPLETE)) {
Files.delete(Paths.get(filePath));
Files.deleteIfExists(Paths.get(filePath));
logger.verbose("Downloading to file failed. Cleaning up resources.");
}
} catch (IOException e) {
Expand Down
Loading