Skip to content

Commit

Permalink
InputStreamSubscriber/Tests conform to style
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Oct 28, 2024
1 parent d4b31fd commit 86a42db
Show file tree
Hide file tree
Showing 5 changed files with 448 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,7 @@ public static Publisher<DataBuffer> outputStreamPublisher(
* any of the {@link InputStream#read} methods
* <p>
* Note: {@link Subscription#request(long)} happens eagerly for the first time upon subscription
* and then repeats every time {@code bufferSize - (bufferSize >> 2)} consumed
*
* and then repeats every time {@code bufferSize - (bufferSize >> 2)} consumed.
* @param publisher the source of {@link DataBuffer} which should be represented as an {@link InputStream}
* @param bufferSize the maximum amount of {@link DataBuffer} prefetched in advance and stored inside {@link InputStream}
* @return an {@link InputStream} instance representing given {@link Publisher} messages
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.core.io.buffer;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ConcurrentModificationException;
import java.util.Objects;
import java.util.Queue;
Expand All @@ -27,70 +42,83 @@
* {@link org.springframework.http.client.InputStreamSubscriber}.
*
* @author Oleh Dokuka
* @since 6.1
* @author Rossen Stoyanchev
* @since 6.2
*/
final class InputStreamSubscriber extends InputStream implements Subscriber<DataBuffer> {

static final Object READY = new Object();
static final DataBuffer DONE = DefaultDataBuffer.fromEmptyByteBuffer(DefaultDataBufferFactory.sharedInstance, ByteBuffer.allocate(0));
static final DataBuffer CLOSED = DefaultDataBuffer.fromEmptyByteBuffer(DefaultDataBufferFactory.sharedInstance, ByteBuffer.allocate(0));
private static final Object READY = new Object();

private static final DataBuffer DONE = DefaultDataBufferFactory.sharedInstance.allocateBuffer(0);

private static final DataBuffer CLOSED = DefaultDataBufferFactory.sharedInstance.allocateBuffer(0);


private final int prefetch;

private final int limit;

private final ReentrantLock lock;

private final Queue<DataBuffer> queue;

final int prefetch;
final int limit;
final ReentrantLock lock;
final Queue<DataBuffer> queue;
private final AtomicReference<Object> parkedThread = new AtomicReference<>();

final AtomicReference<Object> parkedThread = new AtomicReference<>();
final AtomicInteger workAmount = new AtomicInteger();
private final AtomicInteger workAmount = new AtomicInteger();

volatile boolean closed;
int consumed;
private volatile boolean closed;

private int consumed;

@Nullable
DataBuffer available;
private DataBuffer available;

@Nullable
Subscription s;
boolean done;
private Subscription subscription;

private boolean done;

@Nullable
Throwable error;
private Throwable error;


InputStreamSubscriber(int prefetch) {
this.prefetch = prefetch;
this.limit = prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : prefetch - (prefetch >> 2);
this.limit = (prefetch == Integer.MAX_VALUE ? Integer.MAX_VALUE : prefetch - (prefetch >> 2));
this.queue = new ArrayBlockingQueue<>(prefetch);
this.lock = new ReentrantLock(false);
}


@Override
public void onSubscribe(Subscription subscription) {
if (this.s != null) {
if (this.subscription != null) {
subscription.cancel();
return;
}

this.s = subscription;
subscription.request(prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : prefetch);
this.subscription = subscription;
subscription.request(this.prefetch == Integer.MAX_VALUE ? Long.MAX_VALUE : this.prefetch);
}

@Override
public void onNext(DataBuffer t) {
Assert.notNull(t, "DataBuffer must not be null");
public void onNext(DataBuffer buffer) {
Assert.notNull(buffer, "DataBuffer must not be null");

if (this.done) {
discard(t);
discard(buffer);
return;
}

if (!queue.offer(t)) {
discard(t);
error = new RuntimeException("Buffer overflow");
done = true;
if (!this.queue.offer(buffer)) {
discard(buffer);
this.error = new RuntimeException("Buffer overflow");
this.done = true;
}

int previousWorkState = addWork();
if (previousWorkState == Integer.MIN_VALUE) {
DataBuffer value = queue.poll();
DataBuffer value = this.queue.poll();
if (value != null) {
discard(value);
}
Expand Down Expand Up @@ -136,51 +164,62 @@ int addWork() {
return Integer.MIN_VALUE;
}

int nextProduced = produced == Integer.MAX_VALUE ? 1 : produced + 1;

int nextProduced = (produced == Integer.MAX_VALUE ? 1 : produced + 1);

if (workAmount.weakCompareAndSetRelease(produced, nextProduced)) {
if (this.workAmount.weakCompareAndSetRelease(produced, nextProduced)) {
return produced;
}
}
}

private void resume() {
if (this.parkedThread != READY) {
Object old = this.parkedThread.getAndSet(READY);
if (old != READY) {
LockSupport.unpark((Thread)old);
}
}
}

/* InputStream implementation */

@Override
public int read() throws IOException {
if (!lock.tryLock()) {
if (!this.lock.tryLock()) {
if (this.closed) {
return -1;
}
throw new ConcurrentModificationException("concurrent access is disallowed");
throw new ConcurrentModificationException("Concurrent access is not allowed");
}

try {
DataBuffer bytes = getBytesOrAwait();
DataBuffer next = getNextOrAwait();

if (bytes == DONE) {
if (next == DONE) {
this.closed = true;
cleanAndFinalize();
if (this.error == null) {
return -1;
}
else {
throw Exceptions.propagate(error);
throw Exceptions.propagate(this.error);
}
} else if (bytes == CLOSED) {
}
else if (next == CLOSED) {
cleanAndFinalize();
return -1;
}

return bytes.read() & 0xFF;
return next.read() & 0xFF;
}
catch (Throwable t) {
catch (Throwable ex) {
this.closed = true;
requiredSubscriber().cancel();
cleanAndFinalize();
throw Exceptions.propagate(t);
throw Exceptions.propagate(ex);
}
finally {
lock.unlock();
this.lock.unlock();
}
}

Expand All @@ -191,7 +230,7 @@ public int read(byte[] b, int off, int len) throws IOException {
return 0;
}

if (!lock.tryLock()) {
if (!this.lock.tryLock()) {
if (this.closed) {
return -1;
}
Expand All @@ -200,9 +239,9 @@ public int read(byte[] b, int off, int len) throws IOException {

try {
for (int j = 0; j < len;) {
DataBuffer bytes = getBytesOrAwait();
DataBuffer next = getNextOrAwait();

if (bytes == DONE) {
if (next == DONE) {
cleanAndFinalize();
if (this.error == null) {
this.closed = true;
Expand All @@ -211,37 +250,37 @@ public int read(byte[] b, int off, int len) throws IOException {
else {
if (j == 0) {
this.closed = true;
throw Exceptions.propagate(error);
throw Exceptions.propagate(this.error);
}

return j;
}
} else if (bytes == CLOSED) {
}
else if (next == CLOSED) {
requiredSubscriber().cancel();
cleanAndFinalize();
return -1;
}
int initialReadPosition = bytes.readPosition();
bytes.read(b, off + j, Math.min(len - j, bytes.readableByteCount()));
j += bytes.readPosition() - initialReadPosition;
int initialReadPosition = next.readPosition();
next.read(b, off + j, Math.min(len - j, next.readableByteCount()));
j += next.readPosition() - initialReadPosition;
}

return len;
}
catch (Throwable t) {
catch (Throwable ex) {
this.closed = true;
requiredSubscriber().cancel();
cleanAndFinalize();
throw Exceptions.propagate(t);
throw Exceptions.propagate(ex);
}
finally {
lock.unlock();
this.lock.unlock();
}
}

DataBuffer getBytesOrAwait() {
private DataBuffer getNextOrAwait() {
if (this.available == null || this.available.readableByteCount() == 0) {

discard(this.available);
this.available = null;

Expand All @@ -251,23 +290,23 @@ DataBuffer getBytesOrAwait() {
return CLOSED;
}

boolean d = this.done;
DataBuffer t = this.queue.poll();
if (t != null) {
boolean done = this.done;
DataBuffer buffer = this.queue.poll();
if (buffer != null) {
int consumed = ++this.consumed;
this.available = t;
this.available = buffer;
if (consumed == this.limit) {
this.consumed = 0;
requiredSubscriber().request(this.limit);
}
break;
}

if (d) {
if (done) {
return DONE;
}

actualWorkAmount = workAmount.addAndGet(-actualWorkAmount);
actualWorkAmount = this.workAmount.addAndGet(-actualWorkAmount);
if (actualWorkAmount == 0) {
await();
}
Expand All @@ -277,15 +316,14 @@ DataBuffer getBytesOrAwait() {
return this.available;
}

void cleanAndFinalize() {
private void cleanAndFinalize() {
discard(this.available);
this.available = null;

for (;;) {
int workAmount = this.workAmount.getPlain();
DataBuffer value;

while((value = queue.poll()) != null) {
while ((value = this.queue.poll()) != null) {
discard(value);
}

Expand All @@ -295,10 +333,6 @@ void cleanAndFinalize() {
}
}

void discard(@Nullable DataBuffer value) {
DataBufferUtils.release(value);
}

@Override
public void close() throws IOException {
if (this.closed) {
Expand All @@ -324,8 +358,12 @@ public void close() throws IOException {
}

private Subscription requiredSubscriber() {
Assert.state(this.s != null, "Subscriber must be subscribed to use InputStream");
return this.s;
Assert.state(this.subscription != null, "Subscriber must be subscribed to use InputStream");
return this.subscription;
}

private void discard(@Nullable DataBuffer buffer) {
DataBufferUtils.release(buffer);
}

private void await() {
Expand All @@ -341,7 +379,7 @@ private void await() {
throw new IllegalStateException("Only one (Virtual)Thread can await!");
}

if (parkedThread.compareAndSet( null, toUnpark)) {
if (this.parkedThread.compareAndSet( null, toUnpark)) {
LockSupport.park();
// we don't just break here because park() can wake up spuriously
// if we got a proper resume, get() == READY and the loop will quit above
Expand All @@ -351,13 +389,4 @@ private void await() {
this.parkedThread.lazySet(null);
}

private void resume() {
if (this.parkedThread != READY) {
Object old = parkedThread.getAndSet(READY);
if (old != READY) {
LockSupport.unpark((Thread)old);
}
}
}

}
Loading

0 comments on commit 86a42db

Please sign in to comment.