Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: consume per-stream inflow windows ahead of time - large speedup for big messages #1073

Closed
wants to merge 8 commits into from
6 changes: 3 additions & 3 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ type parser struct {
// r is the underlying reader.
// See the comment on recvMsg for the permissible
// error types.
r io.Reader
r transport.FullReader

// The header of a gRPC message. Find more detail
// at http://www.grpc.io/docs/guides/wire.html.
Expand All @@ -242,7 +242,7 @@ type parser struct {
// that the underlying io.Reader must not return an incompatible
// error.
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
if _, err := p.r.ReadFull(p.header[:]); err != nil {
return 0, nil, err
}

Expand All @@ -258,7 +258,7 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if _, err := io.ReadFull(p.r, msg); err != nil {
if _, err := p.r.ReadFull(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
Expand Down
16 changes: 12 additions & 4 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ import (
"google.golang.org/grpc/transport"
)

type fullReaderForTesting struct {
buf io.Reader
}

func (f *fullReaderForTesting) ReadFull(p []byte) (int, error) {
return io.ReadFull(f.buf, p)
}

func TestSimpleParsing(t *testing.T) {
bigMsg := bytes.Repeat([]byte{'x'}, 1<<24)
for _, test := range []struct {
Expand All @@ -65,8 +73,8 @@ func TestSimpleParsing(t *testing.T) {
// Check that messages with length >= 2^24 are parsed.
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
} {
buf := bytes.NewReader(test.p)
parser := &parser{r: buf}
fullReader := &fullReaderForTesting{buf: bytes.NewReader(test.p)}
parser := &parser{r: fullReader}
pt, b, err := parser.recvMsg(math.MaxInt32)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
Expand All @@ -77,8 +85,8 @@ func TestSimpleParsing(t *testing.T) {
func TestMultipleParsing(t *testing.T) {
// Set a byte stream consists of 3 messages with their headers.
p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
b := bytes.NewReader(p)
parser := &parser{r: b}
fullReader := &fullReaderForTesting{buf: bytes.NewReader(p)}
parser := &parser{r: fullReader}

wantRecvs := []struct {
pt payloadFormat
Expand Down
62 changes: 37 additions & 25 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2243,8 +2243,8 @@ func testCancelNoIO(t *testing.T, e env) {
// The following tests the gRPC streaming RPC implementations.
// TODO(zhaoq): Have better coverage on error cases.
var (
reqSizes = []int{27182, 8, 1828, 45904}
respSizes = []int{31415, 9, 2653, 58979}
pingPongReqSizes = []int{27182, 8, 1828, 45904}
pingPongRespSizes = []int{31415, 9, 2653, 58979}
)

func TestNoService(t *testing.T) {
Expand Down Expand Up @@ -2289,14 +2289,14 @@ func testPingPong(t *testing.T, e env) {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
var index int
for index < len(reqSizes) {
for index < len(pingPongReqSizes) {
respParam := []*testpb.ResponseParameters{
{
Size: proto.Int32(int32(respSizes[index])),
Size: proto.Int32(int32(pingPongRespSizes[index])),
},
}

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(pingPongReqSizes[index]))
if err != nil {
t.Fatal(err)
}
Expand All @@ -2318,8 +2318,8 @@ func testPingPong(t *testing.T, e env) {
t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
}
size := len(reply.GetPayload().GetBody())
if size != int(respSizes[index]) {
t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
if size != int(pingPongRespSizes[index]) {
t.Fatalf("Got reply body of length %d, want %d", size, pingPongRespSizes[index])
}
index++
}
Expand Down Expand Up @@ -2367,14 +2367,14 @@ func testMetadataStreamingRPC(t *testing.T, e env) {
t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
}
var index int
for index < len(reqSizes) {
for index < len(pingPongReqSizes) {
respParam := []*testpb.ResponseParameters{
{
Size: proto.Int32(int32(respSizes[index])),
Size: proto.Int32(int32(pingPongRespSizes[index])),
},
}

payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(pingPongReqSizes[index]))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -2405,20 +2405,26 @@ func testMetadataStreamingRPC(t *testing.T, e env) {
}

func TestServerStreaming(t *testing.T) {
serverRespSizes := [][]int{
{27182, 8, 1828, 45904},
{(1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21)},
}
defer leakCheck(t)()
for _, e := range listTestEnv() {
testServerStreaming(t, e)
for _, s := range serverRespSizes {
for _, e := range listTestEnv() {
testServerStreaming(t, e, s)
}
}
}

func testServerStreaming(t *testing.T, e env) {
func testServerStreaming(t *testing.T, e env, serverRespSizes []int) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())

respParam := make([]*testpb.ResponseParameters, len(respSizes))
for i, s := range respSizes {
respParam := make([]*testpb.ResponseParameters, len(serverRespSizes))
for i, s := range serverRespSizes {
respParam[i] = &testpb.ResponseParameters{
Size: proto.Int32(int32(s)),
}
Expand All @@ -2445,17 +2451,17 @@ func testServerStreaming(t *testing.T, e env) {
t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
}
size := len(reply.GetPayload().GetBody())
if size != int(respSizes[index]) {
t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
if size != int(serverRespSizes[index]) {
t.Fatalf("Got reply body of length %d, want %d", size, serverRespSizes[index])
}
index++
respCnt++
}
if rpcStatus != io.EOF {
t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
}
if respCnt != len(respSizes) {
t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
if respCnt != len(serverRespSizes) {
t.Fatalf("Got %d reply, want %d", len(serverRespSizes), respCnt)
}
}

Expand All @@ -2473,8 +2479,8 @@ func testFailedServerStreaming(t *testing.T, e env) {
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())

respParam := make([]*testpb.ResponseParameters, len(respSizes))
for i, s := range respSizes {
respParam := make([]*testpb.ResponseParameters, len(pingPongRespSizes))
for i, s := range pingPongRespSizes {
respParam[i] = &testpb.ResponseParameters{
Size: proto.Int32(int32(s)),
}
Expand Down Expand Up @@ -2576,12 +2582,18 @@ func testServerStreamingConcurrent(t *testing.T, e env) {

func TestClientStreaming(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testClientStreaming(t, e)
clientReqSizes := [][]int{
{27182, 8, 1828, 45904},
{(1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21), (1 << 21)},
}
for _, s := range clientReqSizes {
for _, e := range listTestEnv() {
testClientStreaming(t, e, s)
}
}
}

func testClientStreaming(t *testing.T, e env) {
func testClientStreaming(t *testing.T, e env, clientReqSizes []int) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
Expand All @@ -2593,7 +2605,7 @@ func testClientStreaming(t *testing.T, e env) {
}

var sum int
for _, s := range reqSizes {
for _, s := range clientReqSizes {
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
if err != nil {
t.Fatal(err)
Expand Down
60 changes: 55 additions & 5 deletions transport/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"sync"
"time"

"google.golang.org/grpc/grpclog"

"golang.org/x/net/http2"
)

Expand All @@ -58,6 +60,21 @@ const (
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
// Put a cap on the max possible window update (this value reached when
// an attempt to read a large message is made).
// 4M is greater than connection window but is arbitrary otherwise.
// Note this must be greater than a stream's incoming window size to have an effect.
maxSingleStreamWindowUpdate = 4194303

// max legal window update
http2MaxWindowUpdate = 2147483647
// The fraction of an "inFlow" flow control window's limit at which accumulated
// "pending updates" should be flushed out and cause a window update to be sent.
// This number is arbitrary; limit/4 makes sure that the receiver isn't
// constantly busy sending window updates, but it also tries to avoid
// sending an update "too late" and causing the sender to stall.
// TODO: possibly tweaking this effects performance in some scenarios.
pendingUpdateThreshold = 4
)

// The following defines various control items which could flow through
Expand Down Expand Up @@ -161,34 +178,51 @@ type inFlow struct {
limit uint32

mu sync.Mutex
// pendingData is the overall data which have been received but not been
// The overall data which has been received but not been
// consumed by applications.
pendingData uint32
// The amount of data the application has consumed but grpc has not sent
// window update for them. Used to reduce window update frequency.
pendingUpdate uint32

// This is temporary space in the incoming flow control that can be granted at convenient times
// to prevent the sender from stalling for lack of flow control space.
// If present, it is paid back when data is consumed from the window.
loanedWindowSpace uint32
}

// onData is invoked when some data frame is received. It updates pendingData.
func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
defer f.mu.Unlock()
f.pendingData += n
if f.pendingData+f.pendingUpdate > f.limit {
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
if f.pendingData+f.pendingUpdate > f.limit+f.loanedWindowSpace {
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit+f.loanedWindowSpace)
}
return nil
}

func min(a uint32, b uint32) uint32 {
if a < b {
return a
}
return b
}

// onRead is invoked when the application reads the data. It returns the window size
// to be sent to the peer.
func (f *inFlow) onRead(n uint32) uint32 {
f.mu.Lock()
defer f.mu.Unlock()
if f.pendingData == 0 {
return 0
if n > http2MaxWindowUpdate {
grpclog.Fatalf("potential window update too large. onRead(n) where n is %v; max n is %v", f.pendingUpdate, http2MaxWindowUpdate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a fatalf? When can such a condition occur?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this

}
f.pendingData -= n
// first use up remaining "loanedWindowSpace", add remaining Read to "pendingUpdate"
windowSpaceDebtPayment := min(n, f.loanedWindowSpace)
f.loanedWindowSpace -= windowSpaceDebtPayment
n -= windowSpaceDebtPayment

f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 {
wu := f.pendingUpdate
Expand All @@ -198,6 +232,22 @@ func (f *inFlow) onRead(n uint32) uint32 {
return 0
}

func (f *inFlow) loanWindowSpace(n uint32) uint32 {
f.mu.Lock()
defer f.mu.Unlock()
if f.loanedWindowSpace > 0 {
grpclog.Fatalf("pre-consuming window space while there is pre-consumed window space still outstanding")
}
f.loanedWindowSpace = n

wu := f.pendingUpdate + f.loanedWindowSpace
if wu >= f.limit/pendingUpdateThreshold {
f.pendingUpdate = 0
return wu
}
return 0
}

func (f *inFlow) resetPendingData() uint32 {
f.mu.Lock()
defer f.mu.Unlock()
Expand Down
23 changes: 15 additions & 8 deletions transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -273,6 +274,13 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
}

type handlerServerStreamReader struct{}

func (*handlerServerStreamReader) Read(_ []byte) (int, error) {
grpclog.Fatalf("handler server streamReader is unimplemented")
return 0, nil
}

func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.

Expand Down Expand Up @@ -305,13 +313,13 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
req := ht.req

s := &Stream{
id: 0, // irrelevant
windowHandler: func(int) {}, // nothing
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
id: 0, // irrelevant
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
streamReader: &handlerServerStreamReader{},
}
pr := &peer.Peer{
Addr: ht.RemoteAddr(),
Expand All @@ -322,7 +330,6 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
ctx = metadata.NewContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr)
s.ctx = newContextWithStream(ctx, s)
s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}

// readerDone is closed when the Body.Read-ing goroutine exits.
readerDone := make(chan struct{})
Expand Down
Loading