From 81af98ca447959633c17b04ba31eb27ccebd91cb Mon Sep 17 00:00:00 2001 From: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> Date: Thu, 5 Oct 2023 14:08:52 -0700 Subject: [PATCH] Ensure BinaryData responses are durable in polling operations (#36999) Ensure BinaryData responses are durable in polling operations --- .../util/FluxByteBufferContent.java | 55 ++++++++++------ .../polling/implementation/PollingUtils.java | 4 +- .../com/azure/core/util/BinaryDataTest.java | 64 +++++++++++++++++++ 3 files changed, 101 insertions(+), 22 deletions(-) diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java index 23882c21d59bc..cf56a9403c1ae 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java @@ -18,7 +18,6 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.BiConsumer; /** * A {@link BinaryDataContent} implementation which is backed by a {@link Flux} of {@link ByteBuffer}. @@ -125,31 +124,47 @@ public BinaryDataContent toReplayableContent() { return replayableContent; } - Flux bufferedFlux = content - .map(buffer -> { - // deep copy direct buffers - ByteBuffer copy = ByteBuffer.allocate(buffer.remaining()); - copy.put(buffer); - copy.flip(); - return copy; - }) - // collectList() uses ArrayList. We don't want to be bound by array capacity - // and we don't need random access. - .collect(LinkedList::new, (BiConsumer, ByteBuffer>) LinkedList::add) - .cache() - .flatMapMany( - // Duplicate buffers on re-subscription. - listOfBuffers -> Flux.fromIterable(listOfBuffers).map(ByteBuffer::duplicate)); - replayableContent = new FluxByteBufferContent(bufferedFlux, length, true); - cachedReplayableContent.set(replayableContent); - return replayableContent; + return bufferContent().map(bufferedData -> { + FluxByteBufferContent bufferedContent = new FluxByteBufferContent(Flux.fromIterable(bufferedData) + .map(ByteBuffer::duplicate), length, true); + cachedReplayableContent.set(bufferedContent); + + return bufferedContent; + }).block(); } @Override public Mono toReplayableContentAsync() { - return Mono.fromCallable(this::toReplayableContent); + if (isReplayable) { + return Mono.just(this); + } + + FluxByteBufferContent replayableContent = cachedReplayableContent.get(); + if (replayableContent != null) { + return Mono.just(replayableContent); + } + + return bufferContent().cache().map(bufferedData -> { + Flux bufferedFluxData = Flux.fromIterable(bufferedData).map(ByteBuffer::asReadOnlyBuffer); + FluxByteBufferContent bufferedBinaryDataContent = new FluxByteBufferContent(bufferedFluxData, length, true); + cachedReplayableContent.set(bufferedBinaryDataContent); + + return bufferedBinaryDataContent; + }); } + private Mono> bufferContent() { + // collectList() uses ArrayList, we don't want to be bound by array capacity and don't need random access + return content.map(buffer -> { + // deep copy direct buffers + ByteBuffer copy = ByteBuffer.allocate(buffer.remaining()); + copy.put(buffer); + copy.flip(); + return copy; + }).collect(LinkedList::new, LinkedList::add); + } + + @Override public BinaryDataContentType getContentType() { return BinaryDataContentType.BINARY; diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/implementation/PollingUtils.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/implementation/PollingUtils.java index 0c8a6ed140c0d..31cf08fa0323b 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/implementation/PollingUtils.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/polling/implementation/PollingUtils.java @@ -63,7 +63,7 @@ public static BinaryData serializeResponseSync(Object response, ObjectSerializer public static Mono deserializeResponse(BinaryData binaryData, ObjectSerializer serializer, TypeReference typeReference) { if (TypeUtil.isTypeOrSubTypeOf(BinaryData.class, typeReference.getJavaType())) { - return Mono.just((T) binaryData); + return (Mono) binaryData.toReplayableBinaryDataAsync(); } else { return binaryData.toObjectAsync(typeReference, serializer); } @@ -83,7 +83,7 @@ public static Mono deserializeResponse(BinaryData binaryData, ObjectSeria public static T deserializeResponseSync(BinaryData binaryData, ObjectSerializer serializer, TypeReference typeReference) { if (TypeUtil.isTypeOrSubTypeOf(BinaryData.class, typeReference.getJavaType())) { - return (T) binaryData; + return (T) binaryData.toReplayableBinaryData(); } else { return binaryData.toObject(typeReference, serializer); } diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/util/BinaryDataTest.java b/sdk/core/azure-core/src/test/java/com/azure/core/util/BinaryDataTest.java index a5a178dc5aea7..04141a4f0f66d 100644 --- a/sdk/core/azure-core/src/test/java/com/azure/core/util/BinaryDataTest.java +++ b/sdk/core/azure-core/src/test/java/com/azure/core/util/BinaryDataTest.java @@ -57,6 +57,7 @@ import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -68,6 +69,7 @@ import static com.azure.core.CoreTestUtils.fillArray; import static com.azure.core.CoreTestUtils.readStream; import static com.azure.core.implementation.util.BinaryDataContent.STREAM_READ_SIZE; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -1049,6 +1051,68 @@ public void binaryDataAsPropertyDeserialization() throws IOException { assertEquals(expected.getProperty().toString(), actual.getProperty().toString()); } + @Test + public void emptyFluxByteBufferToReplayable() { + BinaryData binaryData = BinaryData.fromFlux(Flux.empty()).block(); + + BinaryData replayable = assertDoesNotThrow(() -> binaryData.toReplayableBinaryData()); + assertEquals("", replayable.toString()); + } + + @Test + public void emptyFluxByteBufferToReplayableAsync() { + StepVerifier.create(BinaryData.fromFlux(Flux.empty()) + .flatMap(BinaryData::toReplayableBinaryDataAsync)) + .assertNext(replayable -> assertEquals("", replayable.toString())) + .verifyComplete(); + } + + /** + * Tests that {@link FluxByteBufferContent#toReplayableContent()} eagerly makes the {@link FluxByteBufferContent} + * replayable. Before, this method wouldn't make the content replayable until the return + * {@link FluxByteBufferContent} was consumed, which defeated the purpose of the method as the underlying data could + * be reclaimed or consumed before it was made replayable. + */ + @Test + public void fluxByteBufferToReplayableEagerlyConvertsToReplayable() { + byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + byte[] expectedData = CoreUtils.clone(data); + + BinaryDataContent binaryDataContent = new FluxByteBufferContent(Flux.just(ByteBuffer.wrap(data))) + .toReplayableContent(); + + Arrays.fill(data, (byte) 0); + + assertArraysEqual(expectedData, binaryDataContent.toBytes()); + } + + /** + * Tests that {@link FluxByteBufferContent} returned by {@link FluxByteBufferContent#toReplayableContentAsync()} + * won't attempt to access the original {@link Flux Flux<ByteBuffer>} as the initial duplicated is cached as a + * stream of {@link ByteBuffer ByteBuffers} that are shared to all subscribers, and duplicated in each subscription + * so that the underlying content cannot be modified. + */ + @Test + public void multipleSubscriptionsToReplayableAsyncFluxByteBufferAreConsistent() { + byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + byte[] expectedData = CoreUtils.clone(data); + + Mono binaryDataContentMono = new FluxByteBufferContent(Flux.just(ByteBuffer.wrap(data))) + .toReplayableContentAsync(); + + StepVerifier.create(binaryDataContentMono) + .assertNext(binaryDataContent -> assertArraysEqual(expectedData, binaryDataContent.toBytes())) + .verifyComplete(); + + Arrays.fill(data, (byte) 0); + + StepVerifier.create(binaryDataContentMono) + .assertNext(binaryDataContent -> assertArraysEqual(expectedData, binaryDataContent.toBytes())) + .verifyComplete(); + } + public static final class BinaryDataAsProperty { @JsonProperty("property") private BinaryData property;