Skip to content

Commit

Permalink
Allow the queued byte threshold for a Stream to be ready to be config…
Browse files Browse the repository at this point in the history
…urable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener
  • Loading branch information
jduo committed Mar 5, 2024
1 parent 8feb919 commit 959473b
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 4 deletions.
20 changes: 20 additions & 0 deletions api/src/main/java/io/grpc/CallOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public final class CallOptions {
private final Integer maxInboundMessageSize;
@Nullable
private final Integer maxOutboundMessageSize;
@Nullable
private final Integer onReadyThreshold;

private CallOptions(Builder builder) {
this.deadline = builder.deadline;
Expand All @@ -91,6 +93,7 @@ private CallOptions(Builder builder) {
this.waitForReady = builder.waitForReady;
this.maxInboundMessageSize = builder.maxInboundMessageSize;
this.maxOutboundMessageSize = builder.maxOutboundMessageSize;
this.onReadyThreshold = builder.onReadyThreshold;
}

static class Builder {
Expand All @@ -105,6 +108,7 @@ static class Builder {
Boolean waitForReady;
Integer maxInboundMessageSize;
Integer maxOutboundMessageSize;
Integer onReadyThreshold;

private CallOptions build() {
return new CallOptions(this);
Expand Down Expand Up @@ -193,6 +197,10 @@ public CallOptions withWaitForReady() {
return builder.build();
}

public Integer getOnReadyThreshold() {
return onReadyThreshold;
}

/**
* Disables 'wait for ready' feature for the call.
* This method should be rarely used because the default is without 'wait for ready'.
Expand All @@ -203,6 +211,18 @@ public CallOptions withoutWaitForReady() {
return builder.build();
}

public CallOptions withOnReadyThreshold(int numBytes) {
Builder builder = toBuilder(this);
builder.onReadyThreshold = numBytes;
return builder.build();
}

public CallOptions clearOnReadyThreshold() {
Builder builder = toBuilder(this);
builder.onReadyThreshold = null;
return builder.build();

Check warning on line 223 in api/src/main/java/io/grpc/CallOptions.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/CallOptions.java#L221-L223

Added lines #L221 - L223 were not covered by tests
}

/**
* Returns the compressor's name.
*/
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/io/grpc/PartialForwardingServerCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public void setMessageCompression(boolean enabled) {
delegate().setMessageCompression(enabled);
}

@Override
public void setOnReadyThreshold(int numBytes) {
delegate().setOnReadyThreshold(numBytes);
}

@Override
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public void setCompression(String compressor) {
Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/io/grpc/ServerCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ public void setCompression(String compressor) {
// noop
}

public void setOnReadyThreshold(int numBytes) {
// noop
}

Check warning on line 214 in api/src/main/java/io/grpc/ServerCall.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/ServerCall.java#L214

Added line #L214 was not covered by tests

/**
* Returns the level of security guarantee in communications
*
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ public StatsTraceContext statsTraceContext() {
return statsTraceCtx;
}

@Override
public void setOnReadyThreshold(int numBytes) {
transportState().setOnReadyThreshold(numBytes);
}

Check warning on line 183 in core/src/main/java/io/grpc/internal/AbstractServerStream.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/AbstractServerStream.java#L182-L183

Added lines #L182 - L183 were not covered by tests

/**
* This should only be called from the transport thread (except for private interactions with
* {@code AbstractServerStream}).
Expand Down Expand Up @@ -243,6 +248,8 @@ public void deframerClosed(boolean hasPartialMessage) {
}
}



@Override
protected ServerStreamListener listener() {
return listener;
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/java/io/grpc/internal/AbstractStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public final void flush() {
}
}

protected void setOnReadyThreshold(int numBytes) {
transportState().setOnReadyThreshold(numBytes);
}

Check warning on line 82 in core/src/main/java/io/grpc/internal/AbstractStream.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/AbstractStream.java#L81-L82

Added lines #L81 - L82 were not covered by tests

/**
* Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
* (half closure on client; closure on server).
Expand Down Expand Up @@ -143,6 +147,9 @@ public abstract static class TransportState
@GuardedBy("onReadyLock")
private boolean deallocated;

@GuardedBy("onReadyLock")
private int onReadyThreshold;

protected TransportState(
int maxMessageSize,
StatsTraceContext statsTraceCtx,
Expand All @@ -157,6 +164,7 @@ protected TransportState(
transportTracer);
// TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
deframer = rawDeframer;
onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
}

final void optimizeForDirectExecutor() {
Expand All @@ -178,6 +186,12 @@ final void setMaxInboundMessageSize(int maxSize) {
*/
protected abstract StreamListener listener();

void setOnReadyThreshold(int numBytes) {
synchronized (onReadyLock) {
this.onReadyThreshold = numBytes;
}
}

@Override
public void messagesAvailable(StreamListener.MessageProducer producer) {
listener().messagesAvailable(producer);
Expand Down Expand Up @@ -259,7 +273,7 @@ protected final void setDecompressor(Decompressor decompressor) {

private boolean isReady() {
synchronized (onReadyLock) {
return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated;
return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
}
}

Expand Down Expand Up @@ -316,9 +330,9 @@ public final void onSentBytes(int numBytes) {
synchronized (onReadyLock) {
checkState(allocated,
"onStreamAllocated was not called, but it seems the stream is active");
boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
numSentBytesQueued -= numBytes;
boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
doNotify = !belowThresholdBefore && belowThresholdAfter;
}
if (doNotify) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/ServerCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ public void setMessageCompression(boolean enable) {
stream.setMessageCompression(enable);
}

@Override
public void setOnReadyThreshold(int numBytes) {
stream.setOnReadyThreshold(numBytes);
}

Check warning on line 190 in core/src/main/java/io/grpc/internal/ServerCallImpl.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/ServerCallImpl.java#L189-L190

Added lines #L189 - L190 were not covered by tests

@Override
public void setCompression(String compressorName) {
// Added here to give a better error message.
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/ServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,11 @@ public interface ServerStream extends Stream {
* The HTTP/2 stream id, or {@code -1} if not supported.
*/
int streamId();

/**
* A hint to the stream that specifies how many bytes must be queued before
* {@link StreamListener#onReady()} will be called. A stream may ignore this property if
* unsupported.
*/
void setOnReadyThreshold(int numBytes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -482,6 +483,34 @@ public void appendTimeoutInsight() {
assertThat(insight.toString()).isEqualTo("[remote_addr=fake_server_addr]");
}

@Test
public void overrideOnReadyThreshold() {
AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class);
BaseTransportState state = new BaseTransportState(statsTraceCtx, transportTracer);
AbstractClientStream stream = new BaseAbstractClientStream(
allocator,
state,
sink,
statsTraceCtx,
transportTracer,
CallOptions.DEFAULT.withOnReadyThreshold(10),
true);
ClientStreamListener listener = new NoopClientStreamListener();
stream.start(listener);
state.onStreamAllocated();

// Stream should be ready. 0 bytes are queued.
assertTrue(stream.isReady());

// Queue some bytes above the custom threshold and check that the stream is not ready.
stream.onSendingBytes(100);
assertFalse(stream.isReady());

// Simulate a flush and verify ready now.
stream.transportState().onSentBytes(91);
assertTrue(stream.isReady());
}

/**
* No-op base class for testing.
*/
Expand Down Expand Up @@ -517,9 +546,23 @@ public BaseAbstractClientStream(
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
boolean useGet) {
super(allocator, statsTraceCtx, transportTracer, new Metadata(), CallOptions.DEFAULT, useGet);
this(allocator, state, sink, statsTraceCtx, transportTracer, CallOptions.DEFAULT, useGet);
}

public BaseAbstractClientStream(
WritableBufferAllocator allocator,
TransportState state,
Sink sink,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
CallOptions callOptions,
boolean useGet) {
super(allocator, statsTraceCtx, transportTracer, new Metadata(), callOptions, useGet);
this.state = state;
this.sink = sink;
if (callOptions.getOnReadyThreshold() != null) {
this.transportState().setOnReadyThreshold(callOptions.getOnReadyThreshold());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,11 @@ public StatsTraceContext statsTraceContext() {
public int streamId() {
return -1;
}

@Override
public void setOnReadyThreshold(int numBytes) {
// noop
}

Check warning on line 704 in inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java

View check run for this annotation

Codecov / codecov/patch

inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java#L704

Added line #L704 was not covered by tests
}

private class InProcessClientStream implements ClientStream {
Expand Down
3 changes: 3 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class NettyClientStream extends AbstractClientStream {
this.authority = checkNotNull(authority, "authority");
this.scheme = checkNotNull(scheme, "scheme");
this.userAgent = userAgent;
if (callOptions.getOnReadyThreshold() != null) {
this.setOnReadyThreshold(callOptions.getOnReadyThreshold());

Check warning on line 96 in netty/src/main/java/io/grpc/netty/NettyClientStream.java

View check run for this annotation

Codecov / codecov/patch

netty/src/main/java/io/grpc/netty/NettyClientStream.java#L96

Added line #L96 was not covered by tests
}
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class OkHttpClientStream extends AbstractClientStream {
transport,
initialWindowSize,
method.getFullMethodName());
if (callOptions.getOnReadyThreshold() != null) {
this.setOnReadyThreshold(callOptions.getOnReadyThreshold());

Check warning on line 104 in okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java

View check run for this annotation

Codecov / codecov/patch

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java#L104

Added line #L104 was not covered by tests
}
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public abstract class ServerCallStreamObserver<RespT> extends CallStreamObserver
*/
public abstract void setOnCancelHandler(Runnable onCancelHandler);

public abstract void setOnReadyThreshold(int numBytes);

/**
* Sets the compression algorithm to use for the call. May only be called before sending any
* messages. Default gRPC servers support the "gzip" compressor.
Expand Down
5 changes: 5 additions & 0 deletions stub/src/main/java/io/grpc/stub/ServerCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@ public void onCompleted() {
completed = true;
}

@Override
public void setOnReadyThreshold(int numBytes) {
call.setOnReadyThreshold(numBytes);
}

Check warning on line 402 in stub/src/main/java/io/grpc/stub/ServerCalls.java

View check run for this annotation

Codecov / codecov/patch

stub/src/main/java/io/grpc/stub/ServerCalls.java#L401-L402

Added lines #L401 - L402 were not covered by tests

@Override
public boolean isReady() {
return call.isReady();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ public void run() {
});
}

@Override
public void setOnReadyThreshold(final int numBytes) {
serializingExecutor.execute(new Runnable() {

Check warning on line 224 in util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java

View check run for this annotation

Codecov / codecov/patch

util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java#L224

Added line #L224 was not covered by tests
@Override
public void run() {
SerializingServerCall.super.setOnReadyThreshold(numBytes);
}

Check warning on line 228 in util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java

View check run for this annotation

Codecov / codecov/patch

util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java#L227-L228

Added lines #L227 - L228 were not covered by tests
});
}

Check warning on line 230 in util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java

View check run for this annotation

Codecov / codecov/patch

util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java#L230

Added line #L230 was not covered by tests

@Override
public void setCompression(final String compressor) {
serializingExecutor.execute(new Runnable() {
Expand Down

0 comments on commit 959473b

Please sign in to comment.