diff --git a/rpc_util.go b/rpc_util.go index 0fe501f05a05..c2c42f0152a2 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -266,6 +266,11 @@ type parser struct { // The header of a gRPC message. Find more detail // at https://grpc.io/docs/guides/wire.html. header [5]byte + + reuseBuf bool + + // buf holds the memory large enough to read a message, so we don't need to allocate memory each time. + buf []byte } // recvMsg reads a complete gRPC message from the stream. @@ -298,9 +303,13 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead - // of making it for each message: - msg = make([]byte, int(length)) + if p.reuseBuf { + p.reallocBuf(int(length)) + msg = p.buf[:length] + } else { + msg = make([]byte, int(length)) + } + if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF @@ -310,6 +319,19 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt return pf, msg, nil } +func (p *parser) reallocBuf(length int) { + if cap(p.buf) >= length { + return + } + allocLen := 8 + for allocLen < length { + // This may allocate more memory than needed, but avoids allocating memory every time in the case when + // each message is slightly larger than the previous one. + allocLen *= 2 + } + p.buf = make([]byte, allocLen) +} + // encode serializes msg and returns a buffer of message header and a buffer of msg. // If msg is nil, it generates the message header and an empty msg buffer. // TODO(ddyihai): eliminate extra Compressor parameter. diff --git a/rpc_util_test.go b/rpc_util_test.go index 23c471e2e407..ec3f15e2076a 100644 --- a/rpc_util_test.go +++ b/rpc_util_test.go @@ -161,6 +161,31 @@ func TestToRPCErr(t *testing.T) { } } +func TestBufferReuse(t *testing.T) { + for _, test := range []struct { + reuse bool + sameBuf bool + }{ + {true, true}, + {false, false}, + } { + data := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 1, 'a'} + p := &parser{r: bytes.NewReader(data), reuseBuf: test.reuse} + _, msg, err := p.recvMsg(1) + if err != nil { + t.Fatalf("parser{%v}.recvMsg(_) = _, _, %v", p, err) + } + _, msg2, err := p.recvMsg(1) + if err != nil { + t.Fatalf("parser{%v}.recvMsg(_) = _, _, %v", p, err) + } + sameBuffer := &msg[0] == &msg2[0] + if sameBuffer != test.sameBuf { + t.Fatalf("bufferReuse sameBuf = %v, want %v", sameBuffer, test.sameBuf) + } + } +} + // bmEncode benchmarks encoding a Protocol Buffer message containing mSize // bytes. func bmEncode(b *testing.B, mSize int) { diff --git a/stream.go b/stream.go index f91381995102..93177d7f793a 100644 --- a/stream.go +++ b/stream.go @@ -270,7 +270,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth done: done, t: t, s: s, - p: &parser{r: s}, + p: &parser{r: s, reuseBuf: true}, tracing: EnableTracing, trInfo: trInfo,