Skip to content

Commit

Permalink
Reuse message buffer for ClientStream
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood committed Dec 31, 2017
1 parent 65c901e commit ca30e36
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
28 changes: 25 additions & 3 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ type parser struct {
// The header of a gRPC message. Find more detail
// at https://grpc.io/docs/guides/wire.html.
header [5]byte

reuseBuf bool

// buf holds the memory large enough to read a message, so we don't need to allocate memory each time.
buf []byte
}

// recvMsg reads a complete gRPC message from the stream.
Expand Down Expand Up @@ -298,9 +303,13 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if p.reuseBuf {
p.reallocBuf(int(length))
msg = p.buf[:length]
} else {
msg = make([]byte, int(length))
}

if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
Expand All @@ -310,6 +319,19 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
return pf, msg, nil
}

func (p *parser) reallocBuf(length int) {
if cap(p.buf) >= length {
return
}
allocLen := 8
for allocLen < length {
// This may allocate more memory than needed, but avoids allocating memory every time in the case when
// each message is slightly larger than the previous one.
allocLen *= 2
}
p.buf = make([]byte, allocLen)
}

// encode serializes msg and returns a buffer of message header and a buffer of msg.
// If msg is nil, it generates the message header and an empty msg buffer.
// TODO(ddyihai): eliminate extra Compressor parameter.
Expand Down
25 changes: 25 additions & 0 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,31 @@ func TestToRPCErr(t *testing.T) {
}
}

func TestBufferReuse(t *testing.T) {
for _, test := range []struct {
reuse bool
sameBuf bool
}{
{true, true},
{false, false},
} {
data := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 1, 'a'}
p := &parser{r: bytes.NewReader(data), reuseBuf: test.reuse}
_, msg, err := p.recvMsg(1)
if err != nil {
t.Fatalf("parser{%v}.recvMsg(_) = _, _, %v", p, err)
}
_, msg2, err := p.recvMsg(1)
if err != nil {
t.Fatalf("parser{%v}.recvMsg(_) = _, _, %v", p, err)
}
sameBuffer := &msg[0] == &msg2[0]
if sameBuffer != test.sameBuf {
t.Fatalf("bufferReuse sameBuf = %v, want %v", sameBuffer, test.sameBuf)
}
}
}

// bmEncode benchmarks encoding a Protocol Buffer message containing mSize
// bytes.
func bmEncode(b *testing.B, mSize int) {
Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
done: done,
t: t,
s: s,
p: &parser{r: s},
p: &parser{r: s, reuseBuf: true},

tracing: EnableTracing,
trInfo: trInfo,
Expand Down

0 comments on commit ca30e36

Please sign in to comment.