diff --git a/codec.go b/codec.go index 129776547811..37fee7e59e85 100644 --- a/codec.go +++ b/codec.go @@ -31,6 +31,22 @@ type baseCodec interface { Unmarshal(data []byte, v interface{}) error } +// A bufferedBaseCodec is exactly like a baseCodec, but also requires a +// ReturnBuffer method to be implemented. Once a Marshal caller is done with +// the returned byte buffer, they can choose to return it back to the encoding +// library for re-use using this method. +type bufferedBaseCodec interface { + baseCodec + // If implemented in a codec, this function may be called with the byte + // buffer returned by Marshal after gRPC is done with the buffer. + // + // gRPC will not call ReturnBuffer after it's done with the buffer if any of + // the following is true: + // 1. Stats handlers are used. + // 2. Binlogs are enabled. + ReturnBuffer(buf []byte) +} + var _ baseCodec = Codec(nil) var _ baseCodec = encoding.Codec(nil) diff --git a/encoding/encoding.go b/encoding/encoding.go index 195e8448b646..f3fba4d44e07 100644 --- a/encoding/encoding.go +++ b/encoding/encoding.go @@ -86,6 +86,22 @@ type Codec interface { Name() string } +// A BufferedCodec is exactly like a Codec, but also requires a ReturnBuffer +// method to be implemented. Once a Marshal caller is done with the returned +// byte buffer, they can choose to return it back to the encoding library for +// re-use using this method. +type BufferedCodec interface { + Codec + // If implemented in a codec, this function may be called with the byte + // buffer returned by Marshal after gRPC is done with the buffer. + // + // gRPC will not call ReturnBuffer after it's done with the buffer if any of + // the following is true: + // 1. Stats handlers are used. + // 2. Binlogs are enabled. + ReturnBuffer(buf []byte) +} + var registeredCodecs = make(map[string]Codec) // RegisterCodec registers the provided Codec for use with all gRPC clients and diff --git a/encoding/proto/proto.go b/encoding/proto/proto.go index 66b97a6f692a..bd4085dbb60d 100644 --- a/encoding/proto/proto.go +++ b/encoding/proto/proto.go @@ -21,7 +21,6 @@ package proto import ( - "math" "sync" "github.com/golang/protobuf/proto" @@ -38,29 +37,16 @@ func init() { // codec is a Codec implementation with protobuf. It is the default codec for gRPC. type codec struct{} -type cachedProtoBuffer struct { - lastMarshaledSize uint32 - proto.Buffer -} - -func capToMaxInt32(val int) uint32 { - if val > math.MaxInt32 { - return uint32(math.MaxInt32) - } - return uint32(val) -} - -func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) { +func marshal(v interface{}, pb *proto.Buffer) ([]byte, error) { protoMsg := v.(proto.Message) - newSlice := make([]byte, 0, cb.lastMarshaledSize) + newSlice := returnBufferPool.Get().([]byte) - cb.SetBuf(newSlice) - cb.Reset() - if err := cb.Marshal(protoMsg); err != nil { + pb.SetBuf(newSlice) + pb.Reset() + if err := pb.Marshal(protoMsg); err != nil { return nil, err } - out := cb.Bytes() - cb.lastMarshaledSize = capToMaxInt32(len(out)) + out := pb.Bytes() return out, nil } @@ -70,12 +56,12 @@ func (codec) Marshal(v interface{}) ([]byte, error) { return pm.Marshal() } - cb := protoBufferPool.Get().(*cachedProtoBuffer) - out, err := marshal(v, cb) + pb := protoBufferPool.Get().(*proto.Buffer) + out, err := marshal(v, pb) // put back buffer and lose the ref to the slice - cb.SetBuf(nil) - protoBufferPool.Put(cb) + pb.SetBuf(nil) + protoBufferPool.Put(pb) return out, err } @@ -88,23 +74,41 @@ func (codec) Unmarshal(data []byte, v interface{}) error { return pu.Unmarshal(data) } - cb := protoBufferPool.Get().(*cachedProtoBuffer) - cb.SetBuf(data) - err := cb.Unmarshal(protoMsg) - cb.SetBuf(nil) - protoBufferPool.Put(cb) + pb := protoBufferPool.Get().(*proto.Buffer) + pb.SetBuf(data) + err := pb.Unmarshal(protoMsg) + pb.SetBuf(nil) + protoBufferPool.Put(pb) return err } +func (codec) ReturnBuffer(data []byte) { + // Make sure we set the length of the buffer to zero so that future appends + // will start from the zeroeth byte, not append to the previous, stale data. + // + // Apparently, sync.Pool with non-pointer objects (slices, in this case) + // causes small allocations because of how interface{} works under the hood. + // This isn't a problem for us, however, because we're more concerned with + // _how_ much that allocation is. Ideally, we'd be using bytes.Buffer as the + // Marshal return value to remove even that allocation, but we can't change + // the Marshal interface at this point, so disable SA6002. + + //lint:ignore SA6002 + returnBufferPool.Put(data[:0]) +} + func (codec) Name() string { return Name } var protoBufferPool = &sync.Pool{ New: func() interface{} { - return &cachedProtoBuffer{ - Buffer: proto.Buffer{}, - lastMarshaledSize: 16, - } + return &proto.Buffer{} + }, +} + +var returnBufferPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 16) }, } diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index ddee20b6bef2..9fc37f5dcf50 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -135,7 +135,8 @@ type dataFrame struct { d []byte // onEachWrite is called every time // a part of d is written out. - onEachWrite func() + onEachWrite func() + returnBuffer func() } func (*dataFrame) isTransportResponseFrame() bool { return false } @@ -841,6 +842,9 @@ func (l *loopyWriter) processData() (bool, error) { if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { return false, err } + if dataItem.returnBuffer != nil { + dataItem.returnBuffer() + } str.itl.dequeue() // remove the empty data item from stream if str.itl.isEmpty() { str.state = empty @@ -907,6 +911,9 @@ func (l *loopyWriter) processData() (bool, error) { if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. str.itl.dequeue() + if dataItem.returnBuffer != nil { + dataItem.returnBuffer() + } } if str.itl.isEmpty() { str.state = empty diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 294661a3f337..e4b8aa1dcdc9 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -832,8 +832,9 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e return errStreamDone } df := &dataFrame{ - streamID: s.id, - endStream: opts.Last, + streamID: s.id, + endStream: opts.Last, + returnBuffer: opts.ReturnBuffer, } if hdr != nil || data != nil { // If it's not an empty data frame. // Add some data to grpc message header so that we can equally diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 07603836468f..37baa35fefc5 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -908,10 +908,11 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e hdr = append(hdr, data[:emptyLen]...) data = data[emptyLen:] df := &dataFrame{ - streamID: s.id, - h: hdr, - d: data, - onEachWrite: t.setResetPingStrikes, + streamID: s.id, + h: hdr, + d: data, + onEachWrite: t.setResetPingStrikes, + returnBuffer: opts.ReturnBuffer, } if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { select { diff --git a/internal/transport/transport.go b/internal/transport/transport.go index bfab940bd0de..b4a7c46b6b2a 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -586,6 +586,9 @@ type Options struct { // Last indicates whether this write is the last piece for // this stream. Last bool + // If non-nil, ReturnBuffer should be called to return some allocated buffer + // back to a sync pool. + ReturnBuffer func() } // CallHdr carries the information of a particular RPC. diff --git a/preloader.go b/preloader.go index 76acbbcc93b9..3eea4fc576d7 100644 --- a/preloader.go +++ b/preloader.go @@ -28,9 +28,10 @@ import ( // This API is EXPERIMENTAL. type PreparedMsg struct { // Struct for preparing msg before sending them - encodedData []byte - hdr []byte - payload []byte + encodedData []byte + hdr []byte + payload []byte + returnBuffer func() } // Encode marshalls and compresses the message using the codec and compressor for the stream. @@ -55,10 +56,18 @@ func (p *PreparedMsg) Encode(s Stream, msg interface{}) error { return err } p.encodedData = data + + if bcodec, ok := rpcInfo.preloaderInfo.codec.(bufferedBaseCodec); ok { + p.returnBuffer = func() { + bcodec.ReturnBuffer(data) + } + } + compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp) if err != nil { return err } + p.hdr, p.payload = msgHeader(data, compData) return nil } diff --git a/server.go b/server.go index e54083d850c1..55513609b4ab 100644 --- a/server.go +++ b/server.go @@ -842,16 +842,25 @@ func (s *Server) incrCallsFailed() { } func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { - data, err := encode(s.getCodec(stream.ContentSubtype()), msg) + codec := s.getCodec(stream.ContentSubtype()) + data, err := encode(codec, msg) if err != nil { grpclog.Errorln("grpc: server failed to encode response: ", err) return err } + + if bcodec, ok := codec.(bufferedBaseCodec); ok { + opts.ReturnBuffer = func() { + bcodec.ReturnBuffer(data) + } + } + compData, err := compress(data, cp, comp) if err != nil { grpclog.Errorln("grpc: server failed to compress response: ", err) return err } + hdr, payload := msgHeader(data, compData) // TODO(dfawley): should we be checking len(data) instead? if len(payload) > s.opts.maxSendMessageSize { diff --git a/stream.go b/stream.go index bb99940e36fe..aafd72694b62 100644 --- a/stream.go +++ b/stream.go @@ -696,18 +696,26 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) + hdr, payload, data, returnBuffer, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) if err != nil { return err } + // If binlog is turned on, we cannot return the buffer back to the encoder's + // pool. This is because we don't know which will be done first - loopy + // writer or this function. Returning the buffer in one before the other + // finishes is incorrect. + if cs.binlog != nil { + returnBuffer = nil + } + // TODO(dfawley): should we be checking len(data) instead? if len(payload) > *cs.callInfo.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) } msgBytes := data // Store the pointer before setting to nil. For binary logging. op := func(a *csAttempt) error { - err := a.sendMsg(m, hdr, payload, data) + err := a.sendMsg(m, hdr, payload, data, returnBuffer) // nil out the message and uncomp when replaying; they are only needed for // stats which is disabled for subsequent attempts. m, data = nil, nil @@ -833,7 +841,7 @@ func (cs *clientStream) finish(err error) { cs.cancel() } -func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { +func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte, returnBuffer func()) error { cs := a.cs if a.trInfo != nil { a.mu.Lock() @@ -842,7 +850,16 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { } a.mu.Unlock() } - if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { + + // If binlog is turned on, we cannot return the buffer back to the encoder's + // pool. This is because we don't know which will be done first - loopy + // writer or this function. Returning the buffer in one before the other + // finishes is incorrect. + if a.statsHandler != nil { + returnBuffer = nil + } + + if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams, ReturnBuffer: returnBuffer}); err != nil { if !cs.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() @@ -1165,8 +1182,8 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) { as.sentLast = true } - // load hdr, payload, data - hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp) + // load hdr, payload, data, returnBuffer + hdr, payld, _, returnBuffer, err := prepareMsg(m, as.codec, as.cp, as.comp) if err != nil { return err } @@ -1176,7 +1193,7 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) } - if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { + if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams, ReturnBuffer: returnBuffer}); err != nil { if !as.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() @@ -1408,17 +1425,25 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { } }() - // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) + // load hdr, payload, returnBuffer, data + hdr, payload, data, returnBuffer, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) if err != nil { return err } + // If binlog or stats is turned on, we cannot return the buffer back to the + // encoder's pool. This is because we don't know which will be done first - + // loopy writer or this function. Returning the buffer in one before the + // other finishes is incorrect. + if ss.binlog != nil || ss.statsHandler != nil { + returnBuffer = nil + } + // TODO(dfawley): should we be checking len(data) instead? if len(payload) > ss.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) } - if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { + if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false, ReturnBuffer: returnBuffer}); err != nil { return toRPCErr(err) } if ss.binlog != nil { @@ -1510,20 +1535,29 @@ func MethodFromServerStream(stream ServerStream) (string, bool) { // prepareMsg returns the hdr, payload and data // using the compressors passed or using the // passed preparedmsg -func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { +func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, returnBuffer func(), err error) { if preparedMsg, ok := m.(*PreparedMsg); ok { - return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil + return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, preparedMsg.returnBuffer, nil } + // The input interface is not a prepared msg. // Marshal and Compress the data at this point data, err = encode(codec, m) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err + } + + if bcodec, ok := codec.(bufferedBaseCodec); ok { + returnBuffer = func() { + bcodec.ReturnBuffer(data) + } } + compData, err := compress(data, cp, comp) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } + hdr, payload = msgHeader(data, compData) - return hdr, payload, data, nil + return hdr, payload, data, returnBuffer, nil }