Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: consume per-stream inflow windows ahead of time - large speedup for big messages #1073

Closed
wants to merge 8 commits into from
6 changes: 3 additions & 3 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ type parser struct {
// r is the underlying reader.
// See the comment on recvMsg for the permissible
// error types.
r io.Reader
r transport.FullReader

// The header of a gRPC message. Find more detail
// at http://www.grpc.io/docs/guides/wire.html.
Expand All @@ -242,7 +242,7 @@ type parser struct {
// that the underlying io.Reader must not return an incompatible
// error.
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
if _, err := p.r.ReadFull(p.header[:]); err != nil {
return 0, nil, err
}

Expand All @@ -258,7 +258,7 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if _, err := io.ReadFull(p.r, msg); err != nil {
if _, err := p.r.ReadFull(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
Expand Down
16 changes: 12 additions & 4 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ import (
"google.golang.org/grpc/transport"
)

type fullReaderForTesting struct {
buf io.Reader
}

func (f *fullReaderForTesting) ReadFull(p []byte) (int, error) {
return io.ReadFull(f.buf, p)
}

func TestSimpleParsing(t *testing.T) {
bigMsg := bytes.Repeat([]byte{'x'}, 1<<24)
for _, test := range []struct {
Expand All @@ -65,8 +73,8 @@ func TestSimpleParsing(t *testing.T) {
// Check that messages with length >= 2^24 are parsed.
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
} {
buf := bytes.NewReader(test.p)
parser := &parser{r: buf}
fullReader := &fullReaderForTesting{buf: bytes.NewReader(test.p)}
parser := &parser{r: fullReader}
pt, b, err := parser.recvMsg(math.MaxInt32)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
Expand All @@ -77,8 +85,8 @@ func TestSimpleParsing(t *testing.T) {
func TestMultipleParsing(t *testing.T) {
// Set a byte stream consists of 3 messages with their headers.
p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
b := bytes.NewReader(p)
parser := &parser{r: b}
fullReader := &fullReaderForTesting{buf: bytes.NewReader(p)}
parser := &parser{r: fullReader}

wantRecvs := []struct {
pt payloadFormat
Expand Down
Loading