Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse message buffer for streaming #1774

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ type parser struct {
// The header of a gRPC message. Find more detail at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte

reuseBuf bool

// buf holds the memory large enough to read a message, so we don't need to allocate memory each time.
buf []byte
}

// recvMsg reads a complete gRPC message from the stream.
Expand Down Expand Up @@ -349,9 +354,13 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if p.reuseBuf {
p.reallocBuf(int(length))
msg = p.buf[:length]
} else {
msg = make([]byte, int(length))
}

if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
Expand All @@ -361,6 +370,19 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
return pf, msg, nil
}

func (p *parser) reallocBuf(length int) {
if cap(p.buf) >= length {
return
}
allocLen := 8
for allocLen < length {
// This may allocate more memory than needed, but avoids allocating memory every time in the case when
// each message is slightly larger than the previous one.
allocLen *= 2
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

allocLen := int(math.Pow(2, math.Ceil(math.Log2(length))+1))

Copy link
Contributor Author

@coocood coocood Jan 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MakMukhi
I did a micro benchmark, using math functions takes more than 100ns but multiply by 2 in a for loop takes less than 10ns.
For growing 64B to 1MB.

p.buf = make([]byte, allocLen)
}

// encode serializes msg and returns a buffer of message header and a buffer of msg.
// If msg is nil, it generates the message header and an empty msg buffer.
// TODO(ddyihai): eliminate extra Compressor parameter.
Expand Down
24 changes: 24 additions & 0 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,30 @@ func TestToRPCErr(t *testing.T) {
}
}

func TestBufferReuse(t *testing.T) {
for _, test := range []struct {
reuse bool
}{
{true},
{false},
} {
data := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 1, 'a'}
p := &parser{r: bytes.NewReader(data), reuseBuf: test.reuse}
_, msg, err := p.recvMsg(1)
if err != nil {
t.Fatalf("parser{%v}.recvMsg(_) = _, _, %v", p, err)
}
_, msg2, err := p.recvMsg(1)
if err != nil {
t.Fatalf("parser{%v}.recvMsg(_) = _, _, %v", p, err)
}
sameBuffer := &msg[0] == &msg2[0]
if sameBuffer != test.reuse {
t.Fatalf("bufferReuse sameBuffer %v should be equal to reuse %v.", sameBuffer, test.reuse)
}
}
}

// bmEncode benchmarks encoding a Protocol Buffer message containing mSize
// bytes.
func bmEncode(b *testing.B, mSize int) {
Expand Down
11 changes: 10 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type options struct {
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
disableReqBuf bool
}

var defaultServerOptions = options{
Expand Down Expand Up @@ -323,6 +324,14 @@ func ConnectionTimeout(d time.Duration) ServerOption {
}
}

// DisableRequestBuffer returns a ServerOption that disables the request buffer.
// The request buffer improves the performance but may consume much more memory in certain cases.
func DisableRequestBuffer() ServerOption {
return func(o *options) {
o.disableReqBuf = true
}
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
Expand Down Expand Up @@ -994,7 +1003,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss := &serverStream{
t: t,
s: stream,
p: &parser{r: stream},
p: &parser{r: stream, reuseBuf: !s.opts.disableReqBuf},
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
done: done,
t: t,
s: s,
p: &parser{r: s},
p: &parser{r: s, reuseBuf: true},

tracing: EnableTracing,
trInfo: trInfo,
Expand Down