From 1250a012d43fe7a1d02141d4f6a069fc6669285f Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Fri, 8 Nov 2019 10:46:43 -0800 Subject: [PATCH] codec/proto: allow reuse of marshal byte buffers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Performance benchmarks can be found below. Obviously, a 10KB request and 10KB response is tailored to showcase this improvement as this is where codec buffer re-use shines, but I've run other benchmarks too (like 1-byte requests and responses) and there's no discernable impact on performance. To no one's surprise, the number of bytes allocated per operation goes down by almost exactly 10 KB across 60k+ queries, which suggests excellent buffer re-use. The number of allocations itself increases by 5-ish, but that's probably because of a few additional slice pointers that we need to store; these are 8-byte allocations and should have virtually no impact on the allocator and garbage collector. We do not allow reuse of buffers when stat handlers or binlogs are turned on. This is because those two may need access to the data and payload even after the data has been written to the wire. In such cases, we never return the data back to the pool. streaming-networkMode_none-bufConn_false-keepalive_false-benchTime_1m0s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_1-reqSize_10240B-respSize_10240B-compressor_off-channelz_false-preloader_false Title Before After Percentage TotalOps 370480 372395 0.52% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 116049.91 105488.90 -9.10% Allocs/op 111.59 118.27 6.27% ReqT/op 505828693.33 508443306.67 0.52% RespT/op 505828693.33 508443306.67 0.52% 50th-Lat 142.553µs 143.951µs 0.98% 90th-Lat 193.714µs 192.51µs -0.62% 99th-Lat 549.345µs 545.059µs -0.78% Avg-Lat 161.506µs 160.635µs -0.54% Closes https://github.com/grpc/grpc-go/issues/2816 --- codec.go | 16 +++++++ encoding/encoding.go | 16 +++++++ encoding/proto/proto.go | 70 ++++++++++++++++-------------- internal/transport/controlbuf.go | 9 +++- internal/transport/http2_client.go | 5 ++- internal/transport/http2_server.go | 9 ++-- internal/transport/transport.go | 3 ++ preloader.go | 15 +++++-- server.go | 11 ++++- stream.go | 64 ++++++++++++++++++++------- 10 files changed, 159 insertions(+), 59 deletions(-) diff --git a/codec.go b/codec.go index 129776547811..37fee7e59e85 100644 --- a/codec.go +++ b/codec.go @@ -31,6 +31,22 @@ type baseCodec interface { Unmarshal(data []byte, v interface{}) error } +// A bufferedBaseCodec is exactly like a baseCodec, but also requires a +// ReturnBuffer method to be implemented. Once a Marshal caller is done with +// the returned byte buffer, they can choose to return it back to the encoding +// library for re-use using this method. +type bufferedBaseCodec interface { + baseCodec + // If implemented in a codec, this function may be called with the byte + // buffer returned by Marshal after gRPC is done with the buffer. + // + // gRPC will not call ReturnBuffer after it's done with the buffer if any of + // the following is true: + // 1. Stats handlers are used. + // 2. Binlogs are enabled. + ReturnBuffer(buf []byte) +} + var _ baseCodec = Codec(nil) var _ baseCodec = encoding.Codec(nil) diff --git a/encoding/encoding.go b/encoding/encoding.go index 195e8448b646..f3fba4d44e07 100644 --- a/encoding/encoding.go +++ b/encoding/encoding.go @@ -86,6 +86,22 @@ type Codec interface { Name() string } +// A BufferedCodec is exactly like a Codec, but also requires a ReturnBuffer +// method to be implemented. Once a Marshal caller is done with the returned +// byte buffer, they can choose to return it back to the encoding library for +// re-use using this method. +type BufferedCodec interface { + Codec + // If implemented in a codec, this function may be called with the byte + // buffer returned by Marshal after gRPC is done with the buffer. + // + // gRPC will not call ReturnBuffer after it's done with the buffer if any of + // the following is true: + // 1. Stats handlers are used. + // 2. Binlogs are enabled. + ReturnBuffer(buf []byte) +} + var registeredCodecs = make(map[string]Codec) // RegisterCodec registers the provided Codec for use with all gRPC clients and diff --git a/encoding/proto/proto.go b/encoding/proto/proto.go index 66b97a6f692a..bd4085dbb60d 100644 --- a/encoding/proto/proto.go +++ b/encoding/proto/proto.go @@ -21,7 +21,6 @@ package proto import ( - "math" "sync" "github.com/golang/protobuf/proto" @@ -38,29 +37,16 @@ func init() { // codec is a Codec implementation with protobuf. It is the default codec for gRPC. type codec struct{} -type cachedProtoBuffer struct { - lastMarshaledSize uint32 - proto.Buffer -} - -func capToMaxInt32(val int) uint32 { - if val > math.MaxInt32 { - return uint32(math.MaxInt32) - } - return uint32(val) -} - -func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) { +func marshal(v interface{}, pb *proto.Buffer) ([]byte, error) { protoMsg := v.(proto.Message) - newSlice := make([]byte, 0, cb.lastMarshaledSize) + newSlice := returnBufferPool.Get().([]byte) - cb.SetBuf(newSlice) - cb.Reset() - if err := cb.Marshal(protoMsg); err != nil { + pb.SetBuf(newSlice) + pb.Reset() + if err := pb.Marshal(protoMsg); err != nil { return nil, err } - out := cb.Bytes() - cb.lastMarshaledSize = capToMaxInt32(len(out)) + out := pb.Bytes() return out, nil } @@ -70,12 +56,12 @@ func (codec) Marshal(v interface{}) ([]byte, error) { return pm.Marshal() } - cb := protoBufferPool.Get().(*cachedProtoBuffer) - out, err := marshal(v, cb) + pb := protoBufferPool.Get().(*proto.Buffer) + out, err := marshal(v, pb) // put back buffer and lose the ref to the slice - cb.SetBuf(nil) - protoBufferPool.Put(cb) + pb.SetBuf(nil) + protoBufferPool.Put(pb) return out, err } @@ -88,23 +74,41 @@ func (codec) Unmarshal(data []byte, v interface{}) error { return pu.Unmarshal(data) } - cb := protoBufferPool.Get().(*cachedProtoBuffer) - cb.SetBuf(data) - err := cb.Unmarshal(protoMsg) - cb.SetBuf(nil) - protoBufferPool.Put(cb) + pb := protoBufferPool.Get().(*proto.Buffer) + pb.SetBuf(data) + err := pb.Unmarshal(protoMsg) + pb.SetBuf(nil) + protoBufferPool.Put(pb) return err } +func (codec) ReturnBuffer(data []byte) { + // Make sure we set the length of the buffer to zero so that future appends + // will start from the zeroeth byte, not append to the previous, stale data. + // + // Apparently, sync.Pool with non-pointer objects (slices, in this case) + // causes small allocations because of how interface{} works under the hood. + // This isn't a problem for us, however, because we're more concerned with + // _how_ much that allocation is. Ideally, we'd be using bytes.Buffer as the + // Marshal return value to remove even that allocation, but we can't change + // the Marshal interface at this point, so disable SA6002. + + //lint:ignore SA6002 + returnBufferPool.Put(data[:0]) +} + func (codec) Name() string { return Name } var protoBufferPool = &sync.Pool{ New: func() interface{} { - return &cachedProtoBuffer{ - Buffer: proto.Buffer{}, - lastMarshaledSize: 16, - } + return &proto.Buffer{} + }, +} + +var returnBufferPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 16) }, } diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index ddee20b6bef2..9fc37f5dcf50 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -135,7 +135,8 @@ type dataFrame struct { d []byte // onEachWrite is called every time // a part of d is written out. - onEachWrite func() + onEachWrite func() + returnBuffer func() } func (*dataFrame) isTransportResponseFrame() bool { return false } @@ -841,6 +842,9 @@ func (l *loopyWriter) processData() (bool, error) { if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { return false, err } + if dataItem.returnBuffer != nil { + dataItem.returnBuffer() + } str.itl.dequeue() // remove the empty data item from stream if str.itl.isEmpty() { str.state = empty @@ -907,6 +911,9 @@ func (l *loopyWriter) processData() (bool, error) { if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. str.itl.dequeue() + if dataItem.returnBuffer != nil { + dataItem.returnBuffer() + } } if str.itl.isEmpty() { str.state = empty diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 294661a3f337..e4b8aa1dcdc9 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -832,8 +832,9 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e return errStreamDone } df := &dataFrame{ - streamID: s.id, - endStream: opts.Last, + streamID: s.id, + endStream: opts.Last, + returnBuffer: opts.ReturnBuffer, } if hdr != nil || data != nil { // If it's not an empty data frame. // Add some data to grpc message header so that we can equally diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 07603836468f..37baa35fefc5 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -908,10 +908,11 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e hdr = append(hdr, data[:emptyLen]...) data = data[emptyLen:] df := &dataFrame{ - streamID: s.id, - h: hdr, - d: data, - onEachWrite: t.setResetPingStrikes, + streamID: s.id, + h: hdr, + d: data, + onEachWrite: t.setResetPingStrikes, + returnBuffer: opts.ReturnBuffer, } if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { select { diff --git a/internal/transport/transport.go b/internal/transport/transport.go index bfab940bd0de..b4a7c46b6b2a 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -586,6 +586,9 @@ type Options struct { // Last indicates whether this write is the last piece for // this stream. Last bool + // If non-nil, ReturnBuffer should be called to return some allocated buffer + // back to a sync pool. + ReturnBuffer func() } // CallHdr carries the information of a particular RPC. diff --git a/preloader.go b/preloader.go index 76acbbcc93b9..3eea4fc576d7 100644 --- a/preloader.go +++ b/preloader.go @@ -28,9 +28,10 @@ import ( // This API is EXPERIMENTAL. type PreparedMsg struct { // Struct for preparing msg before sending them - encodedData []byte - hdr []byte - payload []byte + encodedData []byte + hdr []byte + payload []byte + returnBuffer func() } // Encode marshalls and compresses the message using the codec and compressor for the stream. @@ -55,10 +56,18 @@ func (p *PreparedMsg) Encode(s Stream, msg interface{}) error { return err } p.encodedData = data + + if bcodec, ok := rpcInfo.preloaderInfo.codec.(bufferedBaseCodec); ok { + p.returnBuffer = func() { + bcodec.ReturnBuffer(data) + } + } + compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp) if err != nil { return err } + p.hdr, p.payload = msgHeader(data, compData) return nil } diff --git a/server.go b/server.go index e54083d850c1..55513609b4ab 100644 --- a/server.go +++ b/server.go @@ -842,16 +842,25 @@ func (s *Server) incrCallsFailed() { } func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { - data, err := encode(s.getCodec(stream.ContentSubtype()), msg) + codec := s.getCodec(stream.ContentSubtype()) + data, err := encode(codec, msg) if err != nil { grpclog.Errorln("grpc: server failed to encode response: ", err) return err } + + if bcodec, ok := codec.(bufferedBaseCodec); ok { + opts.ReturnBuffer = func() { + bcodec.ReturnBuffer(data) + } + } + compData, err := compress(data, cp, comp) if err != nil { grpclog.Errorln("grpc: server failed to compress response: ", err) return err } + hdr, payload := msgHeader(data, compData) // TODO(dfawley): should we be checking len(data) instead? if len(payload) > s.opts.maxSendMessageSize { diff --git a/stream.go b/stream.go index bb99940e36fe..aafd72694b62 100644 --- a/stream.go +++ b/stream.go @@ -696,18 +696,26 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) + hdr, payload, data, returnBuffer, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) if err != nil { return err } + // If binlog is turned on, we cannot return the buffer back to the encoder's + // pool. This is because we don't know which will be done first - loopy + // writer or this function. Returning the buffer in one before the other + // finishes is incorrect. + if cs.binlog != nil { + returnBuffer = nil + } + // TODO(dfawley): should we be checking len(data) instead? if len(payload) > *cs.callInfo.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) } msgBytes := data // Store the pointer before setting to nil. For binary logging. op := func(a *csAttempt) error { - err := a.sendMsg(m, hdr, payload, data) + err := a.sendMsg(m, hdr, payload, data, returnBuffer) // nil out the message and uncomp when replaying; they are only needed for // stats which is disabled for subsequent attempts. m, data = nil, nil @@ -833,7 +841,7 @@ func (cs *clientStream) finish(err error) { cs.cancel() } -func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { +func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte, returnBuffer func()) error { cs := a.cs if a.trInfo != nil { a.mu.Lock() @@ -842,7 +850,16 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { } a.mu.Unlock() } - if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { + + // If binlog is turned on, we cannot return the buffer back to the encoder's + // pool. This is because we don't know which will be done first - loopy + // writer or this function. Returning the buffer in one before the other + // finishes is incorrect. + if a.statsHandler != nil { + returnBuffer = nil + } + + if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams, ReturnBuffer: returnBuffer}); err != nil { if !cs.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() @@ -1165,8 +1182,8 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) { as.sentLast = true } - // load hdr, payload, data - hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp) + // load hdr, payload, data, returnBuffer + hdr, payld, _, returnBuffer, err := prepareMsg(m, as.codec, as.cp, as.comp) if err != nil { return err } @@ -1176,7 +1193,7 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) } - if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { + if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams, ReturnBuffer: returnBuffer}); err != nil { if !as.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() @@ -1408,17 +1425,25 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { } }() - // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) + // load hdr, payload, returnBuffer, data + hdr, payload, data, returnBuffer, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) if err != nil { return err } + // If binlog or stats is turned on, we cannot return the buffer back to the + // encoder's pool. This is because we don't know which will be done first - + // loopy writer or this function. Returning the buffer in one before the + // other finishes is incorrect. + if ss.binlog != nil || ss.statsHandler != nil { + returnBuffer = nil + } + // TODO(dfawley): should we be checking len(data) instead? if len(payload) > ss.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) } - if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { + if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false, ReturnBuffer: returnBuffer}); err != nil { return toRPCErr(err) } if ss.binlog != nil { @@ -1510,20 +1535,29 @@ func MethodFromServerStream(stream ServerStream) (string, bool) { // prepareMsg returns the hdr, payload and data // using the compressors passed or using the // passed preparedmsg -func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { +func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, returnBuffer func(), err error) { if preparedMsg, ok := m.(*PreparedMsg); ok { - return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil + return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, preparedMsg.returnBuffer, nil } + // The input interface is not a prepared msg. // Marshal and Compress the data at this point data, err = encode(codec, m) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err + } + + if bcodec, ok := codec.(bufferedBaseCodec); ok { + returnBuffer = func() { + bcodec.ReturnBuffer(data) + } } + compData, err := compress(data, cp, comp) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } + hdr, payload = msgHeader(data, compData) - return hdr, payload, data, nil + return hdr, payload, data, returnBuffer, nil }