Skip to content

Commit

Permalink
Ensure BinaryData responses are durable in polling operations (#36999)
Browse files Browse the repository at this point in the history
Ensure BinaryData responses are durable in polling operations
  • Loading branch information
alzimmermsft authored Oct 5, 2023
1 parent d0cfcce commit 81af98c
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiConsumer;

/**
* A {@link BinaryDataContent} implementation which is backed by a {@link Flux} of {@link ByteBuffer}.
Expand Down Expand Up @@ -125,31 +124,47 @@ public BinaryDataContent toReplayableContent() {
return replayableContent;
}

Flux<ByteBuffer> bufferedFlux = content
.map(buffer -> {
// deep copy direct buffers
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
copy.put(buffer);
copy.flip();
return copy;
})
// collectList() uses ArrayList. We don't want to be bound by array capacity
// and we don't need random access.
.collect(LinkedList::new, (BiConsumer<LinkedList<ByteBuffer>, ByteBuffer>) LinkedList::add)
.cache()
.flatMapMany(
// Duplicate buffers on re-subscription.
listOfBuffers -> Flux.fromIterable(listOfBuffers).map(ByteBuffer::duplicate));
replayableContent = new FluxByteBufferContent(bufferedFlux, length, true);
cachedReplayableContent.set(replayableContent);
return replayableContent;
return bufferContent().map(bufferedData -> {
FluxByteBufferContent bufferedContent = new FluxByteBufferContent(Flux.fromIterable(bufferedData)
.map(ByteBuffer::duplicate), length, true);
cachedReplayableContent.set(bufferedContent);

return bufferedContent;
}).block();
}

@Override
public Mono<BinaryDataContent> toReplayableContentAsync() {
return Mono.fromCallable(this::toReplayableContent);
if (isReplayable) {
return Mono.just(this);
}

FluxByteBufferContent replayableContent = cachedReplayableContent.get();
if (replayableContent != null) {
return Mono.just(replayableContent);
}

return bufferContent().cache().map(bufferedData -> {
Flux<ByteBuffer> bufferedFluxData = Flux.fromIterable(bufferedData).map(ByteBuffer::asReadOnlyBuffer);
FluxByteBufferContent bufferedBinaryDataContent = new FluxByteBufferContent(bufferedFluxData, length, true);
cachedReplayableContent.set(bufferedBinaryDataContent);

return bufferedBinaryDataContent;
});
}

private Mono<LinkedList<ByteBuffer>> bufferContent() {
// collectList() uses ArrayList, we don't want to be bound by array capacity and don't need random access
return content.map(buffer -> {
// deep copy direct buffers
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
copy.put(buffer);
copy.flip();
return copy;
}).collect(LinkedList::new, LinkedList::add);
}


@Override
public BinaryDataContentType getContentType() {
return BinaryDataContentType.BINARY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static BinaryData serializeResponseSync(Object response, ObjectSerializer
public static <T> Mono<T> deserializeResponse(BinaryData binaryData, ObjectSerializer serializer,
TypeReference<T> typeReference) {
if (TypeUtil.isTypeOrSubTypeOf(BinaryData.class, typeReference.getJavaType())) {
return Mono.just((T) binaryData);
return (Mono<T>) binaryData.toReplayableBinaryDataAsync();
} else {
return binaryData.toObjectAsync(typeReference, serializer);
}
Expand All @@ -83,7 +83,7 @@ public static <T> Mono<T> deserializeResponse(BinaryData binaryData, ObjectSeria
public static <T> T deserializeResponseSync(BinaryData binaryData, ObjectSerializer serializer,
TypeReference<T> typeReference) {
if (TypeUtil.isTypeOrSubTypeOf(BinaryData.class, typeReference.getJavaType())) {
return (T) binaryData;
return (T) binaryData.toReplayableBinaryData();
} else {
return binaryData.toObject(typeReference, serializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -68,6 +69,7 @@
import static com.azure.core.CoreTestUtils.fillArray;
import static com.azure.core.CoreTestUtils.readStream;
import static com.azure.core.implementation.util.BinaryDataContent.STREAM_READ_SIZE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -1049,6 +1051,68 @@ public void binaryDataAsPropertyDeserialization() throws IOException {
assertEquals(expected.getProperty().toString(), actual.getProperty().toString());
}

@Test
public void emptyFluxByteBufferToReplayable() {
BinaryData binaryData = BinaryData.fromFlux(Flux.empty()).block();

BinaryData replayable = assertDoesNotThrow(() -> binaryData.toReplayableBinaryData());
assertEquals("", replayable.toString());
}

@Test
public void emptyFluxByteBufferToReplayableAsync() {
StepVerifier.create(BinaryData.fromFlux(Flux.empty())
.flatMap(BinaryData::toReplayableBinaryDataAsync))
.assertNext(replayable -> assertEquals("", replayable.toString()))
.verifyComplete();
}

/**
* Tests that {@link FluxByteBufferContent#toReplayableContent()} eagerly makes the {@link FluxByteBufferContent}
* replayable. Before, this method wouldn't make the content replayable until the return
* {@link FluxByteBufferContent} was consumed, which defeated the purpose of the method as the underlying data could
* be reclaimed or consumed before it was made replayable.
*/
@Test
public void fluxByteBufferToReplayableEagerlyConvertsToReplayable() {
byte[] data = new byte[1024];
ThreadLocalRandom.current().nextBytes(data);
byte[] expectedData = CoreUtils.clone(data);

BinaryDataContent binaryDataContent = new FluxByteBufferContent(Flux.just(ByteBuffer.wrap(data)))
.toReplayableContent();

Arrays.fill(data, (byte) 0);

assertArraysEqual(expectedData, binaryDataContent.toBytes());
}

/**
* Tests that {@link FluxByteBufferContent} returned by {@link FluxByteBufferContent#toReplayableContentAsync()}
* won't attempt to access the original {@link Flux Flux&lt;ByteBuffer&gt;} as the initial duplicated is cached as a
* stream of {@link ByteBuffer ByteBuffers} that are shared to all subscribers, and duplicated in each subscription
* so that the underlying content cannot be modified.
*/
@Test
public void multipleSubscriptionsToReplayableAsyncFluxByteBufferAreConsistent() {
byte[] data = new byte[1024];
ThreadLocalRandom.current().nextBytes(data);
byte[] expectedData = CoreUtils.clone(data);

Mono<BinaryDataContent> binaryDataContentMono = new FluxByteBufferContent(Flux.just(ByteBuffer.wrap(data)))
.toReplayableContentAsync();

StepVerifier.create(binaryDataContentMono)
.assertNext(binaryDataContent -> assertArraysEqual(expectedData, binaryDataContent.toBytes()))
.verifyComplete();

Arrays.fill(data, (byte) 0);

StepVerifier.create(binaryDataContentMono)
.assertNext(binaryDataContent -> assertArraysEqual(expectedData, binaryDataContent.toBytes()))
.verifyComplete();
}

public static final class BinaryDataAsProperty {
@JsonProperty("property")
private BinaryData property;
Expand Down

0 comments on commit 81af98c

Please sign in to comment.