diff --git a/api/src/main/java/io/grpc/CallOptions.java b/api/src/main/java/io/grpc/CallOptions.java index 87493d2ba0b8..cc1267718c1a 100644 --- a/api/src/main/java/io/grpc/CallOptions.java +++ b/api/src/main/java/io/grpc/CallOptions.java @@ -79,6 +79,8 @@ public final class CallOptions { private final Integer maxInboundMessageSize; @Nullable private final Integer maxOutboundMessageSize; + @Nullable + private final Integer onReadyThreshold; private CallOptions(Builder builder) { this.deadline = builder.deadline; @@ -91,6 +93,7 @@ private CallOptions(Builder builder) { this.waitForReady = builder.waitForReady; this.maxInboundMessageSize = builder.maxInboundMessageSize; this.maxOutboundMessageSize = builder.maxOutboundMessageSize; + this.onReadyThreshold = builder.onReadyThreshold; } static class Builder { @@ -105,6 +108,7 @@ static class Builder { Boolean waitForReady; Integer maxInboundMessageSize; Integer maxOutboundMessageSize; + Integer onReadyThreshold; private CallOptions build() { return new CallOptions(this); @@ -193,6 +197,10 @@ public CallOptions withWaitForReady() { return builder.build(); } + public Integer getOnReadyThreshold() { + return onReadyThreshold; + } + /** * Disables 'wait for ready' feature for the call. * This method should be rarely used because the default is without 'wait for ready'. @@ -203,6 +211,18 @@ public CallOptions withoutWaitForReady() { return builder.build(); } + public CallOptions withOnReadyThreshold(int numBytes) { + Builder builder = toBuilder(this); + builder.onReadyThreshold = numBytes; + return builder.build(); + } + + public CallOptions clearOnReadyThreshold() { + Builder builder = toBuilder(this); + builder.onReadyThreshold = null; + return builder.build(); + } + /** * Returns the compressor's name. */ diff --git a/api/src/main/java/io/grpc/PartialForwardingServerCall.java b/api/src/main/java/io/grpc/PartialForwardingServerCall.java index a7da647308b1..234eec1199f0 100644 --- a/api/src/main/java/io/grpc/PartialForwardingServerCall.java +++ b/api/src/main/java/io/grpc/PartialForwardingServerCall.java @@ -58,6 +58,11 @@ public void setMessageCompression(boolean enabled) { delegate().setMessageCompression(enabled); } + @Override + public void setOnReadyThreshold(int numBytes) { + delegate().setOnReadyThreshold(numBytes); + } + @Override @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704") public void setCompression(String compressor) { diff --git a/api/src/main/java/io/grpc/ServerCall.java b/api/src/main/java/io/grpc/ServerCall.java index 7408479a2305..3afa0a02dc80 100644 --- a/api/src/main/java/io/grpc/ServerCall.java +++ b/api/src/main/java/io/grpc/ServerCall.java @@ -209,6 +209,10 @@ public void setCompression(String compressor) { // noop } + public void setOnReadyThreshold(int numBytes) { + // noop + } + /** * Returns the level of security guarantee in communications * diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java index d781cfa9b8a9..b5c1badf0645 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java @@ -177,6 +177,11 @@ public StatsTraceContext statsTraceContext() { return statsTraceCtx; } + @Override + public void setOnReadyThreshold(int numBytes) { + transportState().setOnReadyThreshold(numBytes); + } + /** * This should only be called from the transport thread (except for private interactions with * {@code AbstractServerStream}). @@ -243,6 +248,8 @@ public void deframerClosed(boolean hasPartialMessage) { } } + + @Override protected ServerStreamListener listener() { return listener; diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index cda08576eae5..e78a5bfbc5e8 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -77,6 +77,10 @@ public final void flush() { } } + protected void setOnReadyThreshold(int numBytes) { + transportState().setOnReadyThreshold(numBytes); + } + /** * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed * (half closure on client; closure on server). @@ -143,6 +147,9 @@ public abstract static class TransportState @GuardedBy("onReadyLock") private boolean deallocated; + @GuardedBy("onReadyLock") + private int onReadyThreshold; + protected TransportState( int maxMessageSize, StatsTraceContext statsTraceCtx, @@ -157,6 +164,7 @@ protected TransportState( transportTracer); // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break. deframer = rawDeframer; + onReadyThreshold = DEFAULT_ONREADY_THRESHOLD; } final void optimizeForDirectExecutor() { @@ -178,6 +186,12 @@ final void setMaxInboundMessageSize(int maxSize) { */ protected abstract StreamListener listener(); + void setOnReadyThreshold(int numBytes) { + synchronized (onReadyLock) { + this.onReadyThreshold = numBytes; + } + } + @Override public void messagesAvailable(StreamListener.MessageProducer producer) { listener().messagesAvailable(producer); @@ -259,7 +273,7 @@ protected final void setDecompressor(Decompressor decompressor) { private boolean isReady() { synchronized (onReadyLock) { - return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated; + return allocated && numSentBytesQueued < onReadyThreshold && !deallocated; } } @@ -316,9 +330,9 @@ public final void onSentBytes(int numBytes) { synchronized (onReadyLock) { checkState(allocated, "onStreamAllocated was not called, but it seems the stream is active"); - boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; + boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold; numSentBytesQueued -= numBytes; - boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD; + boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold; doNotify = !belowThresholdBefore && belowThresholdAfter; } if (doNotify) { diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 1bfee21e0550..dda36258e7cb 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -184,6 +184,11 @@ public void setMessageCompression(boolean enable) { stream.setMessageCompression(enable); } + @Override + public void setOnReadyThreshold(int numBytes) { + stream.setOnReadyThreshold(numBytes); + } + @Override public void setCompression(String compressorName) { // Added here to give a better error message. diff --git a/core/src/main/java/io/grpc/internal/ServerStream.java b/core/src/main/java/io/grpc/internal/ServerStream.java index 861d5f36cc7e..9934b5ec1bff 100644 --- a/core/src/main/java/io/grpc/internal/ServerStream.java +++ b/core/src/main/java/io/grpc/internal/ServerStream.java @@ -96,4 +96,11 @@ public interface ServerStream extends Stream { * The HTTP/2 stream id, or {@code -1} if not supported. */ int streamId(); + + /** + * A hint to the stream that specifies how many bytes must be queued before + * {@link StreamListener#onReady()} will be called. A stream may ignore this property if + * unsupported. + */ + void setOnReadyThreshold(int numBytes); } diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 4ce8a467d9f8..98c2c515b2f7 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -20,6 +20,7 @@ import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -482,6 +483,34 @@ public void appendTimeoutInsight() { assertThat(insight.toString()).isEqualTo("[remote_addr=fake_server_addr]"); } + @Test + public void overrideOnReadyThreshold() { + AbstractClientStream.Sink sink = mock(AbstractClientStream.Sink.class); + BaseTransportState state = new BaseTransportState(statsTraceCtx, transportTracer); + AbstractClientStream stream = new BaseAbstractClientStream( + allocator, + state, + sink, + statsTraceCtx, + transportTracer, + CallOptions.DEFAULT.withOnReadyThreshold(10), + true); + ClientStreamListener listener = new NoopClientStreamListener(); + stream.start(listener); + state.onStreamAllocated(); + + // Stream should be ready. 0 bytes are queued. + assertTrue(stream.isReady()); + + // Queue some bytes above the custom threshold and check that the stream is not ready. + stream.onSendingBytes(100); + assertFalse(stream.isReady()); + + // Simulate a flush and verify ready now. + stream.transportState().onSentBytes(91); + assertTrue(stream.isReady()); + } + /** * No-op base class for testing. */ @@ -517,9 +546,23 @@ public BaseAbstractClientStream( StatsTraceContext statsTraceCtx, TransportTracer transportTracer, boolean useGet) { - super(allocator, statsTraceCtx, transportTracer, new Metadata(), CallOptions.DEFAULT, useGet); + this(allocator, state, sink, statsTraceCtx, transportTracer, CallOptions.DEFAULT, useGet); + } + + public BaseAbstractClientStream( + WritableBufferAllocator allocator, + TransportState state, + Sink sink, + StatsTraceContext statsTraceCtx, + TransportTracer transportTracer, + CallOptions callOptions, + boolean useGet) { + super(allocator, statsTraceCtx, transportTracer, new Metadata(), callOptions, useGet); this.state = state; this.sink = sink; + if (callOptions.getOnReadyThreshold() != null) { + this.transportState().setOnReadyThreshold(callOptions.getOnReadyThreshold()); + } } @Override diff --git a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java index 91e519f9efc7..f091714874cc 100644 --- a/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/inprocess/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -697,6 +697,11 @@ public StatsTraceContext statsTraceContext() { public int streamId() { return -1; } + + @Override + public void setOnReadyThreshold(int numBytes) { + // noop + } } private class InProcessClientStream implements ClientStream { diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0c0bb7eeb8d4..54bc1ae429a2 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -92,6 +92,9 @@ class NettyClientStream extends AbstractClientStream { this.authority = checkNotNull(authority, "authority"); this.scheme = checkNotNull(scheme, "scheme"); this.userAgent = userAgent; + if (callOptions.getOnReadyThreshold() != null) { + this.setOnReadyThreshold(callOptions.getOnReadyThreshold()); + } } @Override diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 50de8c7002fa..c9d6da917f74 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -100,6 +100,9 @@ class OkHttpClientStream extends AbstractClientStream { transport, initialWindowSize, method.getFullMethodName()); + if (callOptions.getOnReadyThreshold() != null) { + this.setOnReadyThreshold(callOptions.getOnReadyThreshold()); + } } @Override diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 8201a2305462..79ec61423670 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -64,6 +64,8 @@ public abstract class ServerCallStreamObserver extends CallStreamObserver */ public abstract void setOnCancelHandler(Runnable onCancelHandler); + public abstract void setOnReadyThreshold(int numBytes); + /** * Sets the compression algorithm to use for the call. May only be called before sending any * messages. Default gRPC servers support the "gzip" compressor. diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 83954af9670d..bbfd3262b11c 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -396,6 +396,11 @@ public void onCompleted() { completed = true; } + @Override + public void setOnReadyThreshold(int numBytes) { + call.setOnReadyThreshold(numBytes); + } + @Override public boolean isReady() { return call.isReady(); diff --git a/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java b/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java index bead2be4e9e9..750a36c52857 100644 --- a/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java +++ b/util/src/main/java/io/grpc/util/TransmitStatusRuntimeExceptionInterceptor.java @@ -219,6 +219,16 @@ public void run() { }); } + @Override + public void setOnReadyThreshold(final int numBytes) { + serializingExecutor.execute(new Runnable() { + @Override + public void run() { + SerializingServerCall.super.setOnReadyThreshold(numBytes); + } + }); + } + @Override public void setCompression(final String compressor) { serializingExecutor.execute(new Runnable() {