From 9839ca34f0081c003c0d20ca0e682da948367373 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Fri, 8 Nov 2019 10:46:43 -0800 Subject: [PATCH] codec/proto: allow reuse of marshal byte buffers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Performance benchmarks can be found below. Obviously, a 8 KiB request/response is tailored to showcase this improvement as this is where codec buffer reuse shines, but I've run other benchmarks too (like 1-byte requests and responses) and there's no discernable impact on performance. We do not allow reuse of buffers when stat handlers or binlogs are turned on. This is because those two may need access to the data and payload even after the data has been written to the wire. In such cases, we never return the data back to the pool. A buffer reuse threshold of 1 KiB was determined after several experiments. There's diminished returns when buffer reuse is enabled for smaller messages (actually, a negative impact). unary-networkMode_none-bufConn_false-keepalive_false-benchTime_40s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_6-reqSize_8192B-respSize_8192B-compressor_off-channelz_false-preloader_false Title Before After Percentage TotalOps 839638 906223 7.93% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 103788.29 80592.47 -22.35% Allocs/op 183.33 189.30 3.27% ReqT/op 1375662899.20 1484755763.20 7.93% RespT/op 1375662899.20 1484755763.20 7.93% 50th-Lat 238.746µs 225.019µs -5.75% 90th-Lat 514.253µs 456.439µs -11.24% 99th-Lat 711.083µs 702.466µs -1.21% Avg-Lat 285.45µs 264.456µs -7.35% --- codec.go | 18 +++ encoding/encoding.go | 18 +++ encoding/proto/proto.go | 68 +++++----- encoding/proto/proto_test.go | 56 ++++++++ internal/leakcheck/leakcheck.go | 2 + internal/transport/controlbuf.go | 15 +++ internal/transport/http2_client.go | 4 + internal/transport/http2_server.go | 4 + internal/transport/transport.go | 3 + preloader.go | 15 ++- server.go | 43 +++++- stream.go | 202 ++++++++++++++++++++++------- vet.sh | 5 +- 13 files changed, 363 insertions(+), 90 deletions(-) diff --git a/codec.go b/codec.go index 129776547811..3d9e00ece815 100644 --- a/codec.go +++ b/codec.go @@ -31,6 +31,24 @@ type baseCodec interface { Unmarshal(data []byte, v interface{}) error } +// A bufferedBaseCodec is exactly like a baseCodec, but also requires a +// ReturnBuffer method to be implemented. Once a Marshal caller is done with +// the returned byte buffer, they can choose to return it back to the encoding +// library for re-use using this method. +// +// This API is EXPERIMENTAL. +type bufferedBaseCodec interface { + baseCodec + // If implemented in a codec, this function may be called with the byte + // buffer returned by Marshal after gRPC is done with the buffer. + // + // gRPC will not call ReturnBuffer after it's done with the buffer if any of + // the following is true: + // 1. Stats handlers are used. + // 2. Binlogs are enabled. + ReturnBuffer(buf []byte) +} + var _ baseCodec = Codec(nil) var _ baseCodec = encoding.Codec(nil) diff --git a/encoding/encoding.go b/encoding/encoding.go index 195e8448b646..1cb78973ad6e 100644 --- a/encoding/encoding.go +++ b/encoding/encoding.go @@ -86,6 +86,24 @@ type Codec interface { Name() string } +// A BufferedCodec is exactly like a Codec, but also requires a ReturnBuffer +// method to be implemented. Once a Marshal caller is done with the returned +// byte buffer, they can choose to return it back to the encoding library for +// re-use using this method. +// +// This API is EXPERIMENTAL. +type BufferedCodec interface { + Codec + // If implemented in a codec, this function may be called with the byte + // buffer returned by Marshal after gRPC is done with the buffer. + // + // gRPC will not call ReturnBuffer after it's done with the buffer if any of + // the following is true: + // 1. Stats handlers are used. + // 2. Binlogs are enabled. + ReturnBuffer(buf []byte) +} + var registeredCodecs = make(map[string]Codec) // RegisterCodec registers the provided Codec for use with all gRPC clients and diff --git a/encoding/proto/proto.go b/encoding/proto/proto.go index 66b97a6f692a..e84a310fade6 100644 --- a/encoding/proto/proto.go +++ b/encoding/proto/proto.go @@ -21,7 +21,6 @@ package proto import ( - "math" "sync" "github.com/golang/protobuf/proto" @@ -38,29 +37,16 @@ func init() { // codec is a Codec implementation with protobuf. It is the default codec for gRPC. type codec struct{} -type cachedProtoBuffer struct { - lastMarshaledSize uint32 - proto.Buffer -} - -func capToMaxInt32(val int) uint32 { - if val > math.MaxInt32 { - return uint32(math.MaxInt32) - } - return uint32(val) -} - -func marshal(v interface{}, cb *cachedProtoBuffer) ([]byte, error) { +func marshal(v interface{}, pb *proto.Buffer) ([]byte, error) { protoMsg := v.(proto.Message) - newSlice := make([]byte, 0, cb.lastMarshaledSize) + newSlice := returnBufferPool.Get().([]byte) - cb.SetBuf(newSlice) - cb.Reset() - if err := cb.Marshal(protoMsg); err != nil { + pb.SetBuf(newSlice) + pb.Reset() + if err := pb.Marshal(protoMsg); err != nil { return nil, err } - out := cb.Bytes() - cb.lastMarshaledSize = capToMaxInt32(len(out)) + out := pb.Bytes() return out, nil } @@ -70,12 +56,12 @@ func (codec) Marshal(v interface{}) ([]byte, error) { return pm.Marshal() } - cb := protoBufferPool.Get().(*cachedProtoBuffer) - out, err := marshal(v, cb) + pb := protoBufferPool.Get().(*proto.Buffer) + out, err := marshal(v, pb) // put back buffer and lose the ref to the slice - cb.SetBuf(nil) - protoBufferPool.Put(cb) + pb.SetBuf(nil) + protoBufferPool.Put(pb) return out, err } @@ -88,23 +74,39 @@ func (codec) Unmarshal(data []byte, v interface{}) error { return pu.Unmarshal(data) } - cb := protoBufferPool.Get().(*cachedProtoBuffer) - cb.SetBuf(data) - err := cb.Unmarshal(protoMsg) - cb.SetBuf(nil) - protoBufferPool.Put(cb) + pb := protoBufferPool.Get().(*proto.Buffer) + pb.SetBuf(data) + err := pb.Unmarshal(protoMsg) + pb.SetBuf(nil) + protoBufferPool.Put(pb) return err } +func (codec) ReturnBuffer(data []byte) { + // Make sure we set the length of the buffer to zero so that future appends + // will start from the zeroeth byte, not append to the previous, stale data. + // + // Apparently, sync.Pool with non-pointer objects (slices, in this case) + // causes small allocations because of how interface{} works under the hood. + // This isn't a problem for us, however, because we're more concerned with + // _how_ much that allocation is. Ideally, we'd be using bytes.Buffer as the + // Marshal return value to remove even that allocation, but we can't change + // the Marshal interface at this point. + returnBufferPool.Put(data[:0]) +} + func (codec) Name() string { return Name } var protoBufferPool = &sync.Pool{ New: func() interface{} { - return &cachedProtoBuffer{ - Buffer: proto.Buffer{}, - lastMarshaledSize: 16, - } + return &proto.Buffer{} + }, +} + +var returnBufferPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 16) }, } diff --git a/encoding/proto/proto_test.go b/encoding/proto/proto_test.go index 10c6f7785a9d..ed5884288976 100644 --- a/encoding/proto/proto_test.go +++ b/encoding/proto/proto_test.go @@ -20,6 +20,7 @@ package proto import ( "bytes" + "fmt" "sync" "testing" @@ -127,3 +128,58 @@ func TestStaggeredMarshalAndUnmarshalUsingSamePool(t *testing.T) { } } } + +func TestBufferReuse(t *testing.T) { + c := codec{} + + var bptr string + marshal := func(toMarshal []byte) []byte { + protoIn := &codec_perf.Buffer{Body: toMarshal} + b, err := c.Marshal(protoIn) + if err != nil { + t.Errorf("codec.Marshal(%v) failed: %v", protoIn, err) + } + lbptr := fmt.Sprintf("%p", b) + if bptr == "" { + bptr = lbptr + } else if bptr != lbptr { + t.Errorf("expected the same buffer to be reused: lptr = %s, ptr = %s", lbptr, bptr) + } + bc := append([]byte(nil), b...) + c.ReturnBuffer(b) + return bc + } + + unmarshal := func(b []byte) []byte { + protoOut := &codec_perf.Buffer{} + if err := c.Unmarshal(b, protoOut); err != nil { + t.Errorf("codec.Unarshal(%v) failed: %v", protoOut, err) + } + return protoOut.GetBody() + } + + check := func(in []byte, out []byte) { + if len(in) != len(out) { + t.Errorf("unequal lengths: len(in=%v)=%d, len(out=%v)=%d", in, len(in), out, len(out)) + } + + for i := 0; i < len(in); i++ { + if in[i] != out[i] { + t.Errorf("unequal values: in[%d] = %v, out[%d] = %v", i, in[i], i, out[i]) + } + } + } + + // To test that the returned buffer does not have unexpected data at the end, + // we use a second input data that is smaller than the first. + in1 := []byte{1, 2, 3} + b1 := marshal(in1) + in2 := []byte{4, 5} + b2 := marshal(in2) + + out1 := unmarshal(b1) + out2 := unmarshal(b2) + + check(in1, out1) + check(in2, out2) +} diff --git a/internal/leakcheck/leakcheck.go b/internal/leakcheck/leakcheck.go index a557a7bf736b..59d51a180130 100644 --- a/internal/leakcheck/leakcheck.go +++ b/internal/leakcheck/leakcheck.go @@ -43,6 +43,8 @@ var goroutinesToIgnore = []string{ "runtime_mcall", "(*loggingT).flushDaemon", "goroutine in C code", + "grpc.callReturnBuffers", + "grpc.waitCallReturnBuffer", } // RegisterIgnoreGoroutine appends s into the ignore goroutine list. The diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index ddee20b6bef2..cd648b9cde06 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -136,6 +136,7 @@ type dataFrame struct { // onEachWrite is called every time // a part of d is written out. onEachWrite func() + wg *sync.WaitGroup } func (*dataFrame) isTransportResponseFrame() bool { return false } @@ -726,6 +727,14 @@ func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { c.onWrite() if str, ok := l.estdStreams[c.streamID]; ok { + // If the stream is active, call the return the data buffer because + // processData is not going to do it. + if str.state == active { + dataItem := str.itl.peek().(*dataFrame) + if dataItem.wg != nil { + dataItem.wg.Done() + } + } // On the server side it could be a trailers-only response or // a RST_STREAM before stream initialization thus the stream might // not be established yet. @@ -842,6 +851,9 @@ func (l *loopyWriter) processData() (bool, error) { return false, err } str.itl.dequeue() // remove the empty data item from stream + if dataItem.wg != nil { + dataItem.wg.Done() + } if str.itl.isEmpty() { str.state = empty } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. @@ -907,6 +919,9 @@ func (l *loopyWriter) processData() (bool, error) { if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. str.itl.dequeue() + if dataItem.wg != nil { + dataItem.wg.Done() + } } if str.itl.isEmpty() { str.state = empty diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 294661a3f337..807fb78f0dfb 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -834,6 +834,7 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e df := &dataFrame{ streamID: s.id, endStream: opts.Last, + wg: opts.ReturnBufferWaitGroup, } if hdr != nil || data != nil { // If it's not an empty data frame. // Add some data to grpc message header so that we can equally @@ -850,6 +851,9 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e return err } } + if df.wg != nil { + df.wg.Add(1) + } return t.controlBuf.put(df) } diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 07603836468f..e3dbe9131596 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -912,6 +912,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e h: hdr, d: data, onEachWrite: t.setResetPingStrikes, + wg: opts.ReturnBufferWaitGroup, } if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { select { @@ -921,6 +922,9 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e } return ContextErr(s.ctx.Err()) } + if df.wg != nil { + df.wg.Add(1) + } return t.controlBuf.put(df) } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index bfab940bd0de..783d9c62e86c 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -586,6 +586,9 @@ type Options struct { // Last indicates whether this write is the last piece for // this stream. Last bool + // If non-nil, ReturnBufferWaitGroup.Done() should be called in order to + // return some allocated buffer back to a sync pool. + ReturnBufferWaitGroup *sync.WaitGroup } // CallHdr carries the information of a particular RPC. diff --git a/preloader.go b/preloader.go index 76acbbcc93b9..10dc029e9e10 100644 --- a/preloader.go +++ b/preloader.go @@ -28,9 +28,10 @@ import ( // This API is EXPERIMENTAL. type PreparedMsg struct { // Struct for preparing msg before sending them - encodedData []byte - hdr []byte - payload []byte + encodedData []byte + hdr []byte + payload []byte + returnBuffer func() } // Encode marshalls and compresses the message using the codec and compressor for the stream. @@ -55,6 +56,14 @@ func (p *PreparedMsg) Encode(s Stream, msg interface{}) error { return err } p.encodedData = data + if len(data) >= bufferReuseThreshold { + if bcodec, ok := rpcInfo.preloaderInfo.codec.(bufferedBaseCodec); ok { + p.returnBuffer = func() { + bcodec.ReturnBuffer(data) + } + } + } + compData, err := compress(data, rpcInfo.preloaderInfo.cp, rpcInfo.preloaderInfo.comp) if err != nil { return err diff --git a/server.go b/server.go index e54083d850c1..5f7bfa7418f2 100644 --- a/server.go +++ b/server.go @@ -841,27 +841,39 @@ func (s *Server) incrCallsFailed() { atomic.AddInt64(&s.czData.callsFailed, 1) } -func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { - data, err := encode(s.getCodec(stream.ContentSubtype()), msg) +func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor, attemptBufferReuse bool) (func(), error) { + codec := s.getCodec(stream.ContentSubtype()) + data, err := encode(codec, msg) if err != nil { grpclog.Errorln("grpc: server failed to encode response: ", err) - return err + return nil, err + } + + var f func() + if attemptBufferReuse && len(data) >= bufferReuseThreshold { + if bcodec, ok := codec.(bufferedBaseCodec); ok { + f = func() { + bcodec.ReturnBuffer(data) + } + opts.ReturnBufferWaitGroup = &sync.WaitGroup{} + } } + compData, err := compress(data, cp, comp) if err != nil { grpclog.Errorln("grpc: server failed to compress response: ", err) - return err + return nil, err } hdr, payload := msgHeader(data, compData) // TODO(dfawley): should we be checking len(data) instead? if len(payload) > s.opts.maxSendMessageSize { - return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) + return nil, status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) } err = t.Write(stream, hdr, payload, opts) if err == nil && s.opts.statsHandler != nil { s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) } - return err + return f, err } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { @@ -1039,7 +1051,15 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } opts := &transport.Options{Last: true} - if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { + f, err := s.sendResponse(t, stream, reply, cp, opts, comp, sh == nil && binlog == nil) + if f != nil { + defer func() { + // Do not wait for the unary response to be written to the wire. Spawn a + // goroutine to wait on the waitgroup instead to be non-blocking. + go waitCallReturnBuffer(f, opts.ReturnBufferWaitGroup) + }() + } + if err != nil { if err == io.EOF { // The entire stream is done (for unary RPC only). return err @@ -1161,6 +1181,15 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.binlog.Log(logEntry) } + // Stats handlers and binlog handlers are allowed to retain references to + // this slice internally. We may not, therefore, return this to the pool. + if ss.statsHandler == nil && ss.binlog == nil { + ss.attemptBufferReuse = true + defer func() { + go callReturnBuffers(ss.returnBuffers) + }() + } + // If dc is set and matches the stream's compression, use it. Otherwise, try // to find a matching registered compressor for decomp. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { diff --git a/stream.go b/stream.go index bb99940e36fe..675cd50bd921 100644 --- a/stream.go +++ b/stream.go @@ -278,6 +278,10 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } cs.binlog = binarylog.GetMethodLogger(method) + // Stats handlers and binlog handlers are allowed to retain references to + // this slice internally. We may not, therefore, return this to the pool. + cs.attemptBufferReuse = sh == nil && cs.binlog == nil + cs.callInfo.stream = cs // Only this initial attempt has stats/tracing. // TODO(dfawley): move to newAttempt when per-attempt stats are implemented. @@ -422,6 +426,21 @@ type clientStream struct { committed bool // active attempt committed for retry? buffer []func(a *csAttempt) error // operations to replay on retry bufferSize int // current size of buffer + + // This is per-stream array instead of a per-attempt one because there may be + // multiple attempts working on the same data, but we may not free the same + // buffer twice. + returnBuffers []*returnBuffer + attemptBufferReuse bool +} + +// returnBuffer contains a function holding a closure that can return a byte +// buffer back to the encoder for reuse. Before this function is called, all +// data frames referencing the closured data must be written to the wire, +// including all re-attempts. +type returnBuffer struct { + f func() + wg *sync.WaitGroup } // csAttempt implements a single transport stream attempt within a @@ -696,18 +715,26 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) + hdr, payload, data, f, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.attemptBufferReuse) if err != nil { return err } + var rb *returnBuffer + if f != nil { + // We can assume mutual exclusion on this slice as only one SendMsg is + // supported concurrently. + rb = &returnBuffer{f: f, wg: &sync.WaitGroup{}} + cs.returnBuffers = append(cs.returnBuffers, rb) + } + // TODO(dfawley): should we be checking len(data) instead? if len(payload) > *cs.callInfo.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) } msgBytes := data // Store the pointer before setting to nil. For binary logging. op := func(a *csAttempt) error { - err := a.sendMsg(m, hdr, payload, data) + err := a.sendMsg(m, hdr, payload, data, rb) // nil out the message and uncomp when replaying; they are only needed for // stats which is disabled for subsequent attempts. m, data = nil, nil @@ -788,6 +815,23 @@ func (cs *clientStream) CloseSend() error { return nil } +func waitCallReturnBuffer(f func(), wg *sync.WaitGroup) { + if f == nil || wg == nil { + return + } + wg.Wait() + f() +} + +func callReturnBuffers(returnBuffers []*returnBuffer) { + if returnBuffers == nil { + return + } + for _, rb := range returnBuffers { + waitCallReturnBuffer(rb.f, rb.wg) + } +} + func (cs *clientStream) finish(err error) { if err == io.EOF { // Ending a stream with EOF indicates a success. @@ -798,6 +842,16 @@ func (cs *clientStream) finish(err error) { cs.mu.Unlock() return } + + // Regardless of what err is, we must return all byte slices to the codec + // layer once each attempt's transport work is done. This may take as long as + // the last data frame's write-to-wire, so we spawn a goroutine to wait on + // all such buffers without blocking. This must be called exactly once so as + // to not free the same buffer twice. + if cs.returnBuffers != nil { + go callReturnBuffers(cs.returnBuffers) + } + cs.finished = true cs.commitAttemptLocked() cs.mu.Unlock() @@ -833,7 +887,7 @@ func (cs *clientStream) finish(err error) { cs.cancel() } -func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { +func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte, rb *returnBuffer) error { cs := a.cs if a.trInfo != nil { a.mu.Lock() @@ -842,7 +896,13 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { } a.mu.Unlock() } - if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { + + var wg *sync.WaitGroup + if rb != nil { + wg = rb.wg + } + + if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams, ReturnBufferWaitGroup: wg}); err != nil { if !cs.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() @@ -1054,17 +1114,17 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin // Use a special addrConnStream to avoid retry. as := &addrConnStream{ - callHdr: callHdr, - ac: ac, - ctx: ctx, - cancel: cancel, - opts: opts, - callInfo: c, - desc: desc, - codec: c.codec, - cp: cp, - comp: comp, - t: t, + callHdr: callHdr, + ac: ac, + ctx: ctx, + cancel: cancel, + opts: opts, + callInfo: c, + desc: desc, + codec: c.codec, + cp: cp, + comp: comp, + t: t, } as.callInfo.stream = as @@ -1095,25 +1155,26 @@ func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method strin } type addrConnStream struct { - s *transport.Stream - ac *addrConn - callHdr *transport.CallHdr - cancel context.CancelFunc - opts []CallOption - callInfo *callInfo - t transport.ClientTransport - ctx context.Context - sentLast bool - desc *StreamDesc - codec baseCodec - cp Compressor - comp encoding.Compressor - decompSet bool - dc Decompressor - decomp encoding.Compressor - p *parser - mu sync.Mutex - finished bool + s *transport.Stream + ac *addrConn + callHdr *transport.CallHdr + cancel context.CancelFunc + opts []CallOption + callInfo *callInfo + t transport.ClientTransport + ctx context.Context + sentLast bool + desc *StreamDesc + codec baseCodec + cp Compressor + comp encoding.Compressor + decompSet bool + dc Decompressor + decomp encoding.Compressor + p *parser + mu sync.Mutex + finished bool + returnBuffers []*returnBuffer } func (as *addrConnStream) Header() (metadata.MD, error) { @@ -1165,18 +1226,28 @@ func (as *addrConnStream) SendMsg(m interface{}) (err error) { as.sentLast = true } - // load hdr, payload, data - hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp) + // load hdr, payload, data, returnBuffer + hdr, payld, _, f, err := prepareMsg(m, as.codec, as.cp, as.comp, true) if err != nil { return err } + var rb *returnBuffer + var wg *sync.WaitGroup + if f != nil { + // We can assume mutual exclusion on this slice as only one SendMsg is + // supported concurrently. + wg = &sync.WaitGroup{} + rb = &returnBuffer{f: f, wg: wg} + as.returnBuffers = append(as.returnBuffers, rb) + } + // TODO(dfawley): should we be checking len(data) instead? if len(payld) > *as.callInfo.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) } - if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { + if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams, ReturnBufferWaitGroup: wg}); err != nil { if !as.desc.ClientStreams { // For non-client-streaming RPCs, we return nil instead of EOF on error // because the generated code requires it. finish is not called; RecvMsg() @@ -1248,6 +1319,13 @@ func (as *addrConnStream) RecvMsg(m interface{}) (err error) { } func (as *addrConnStream) finish(err error) { + // Regardless of what err is, we must return all byte slices to the codec + // layer. This may take as long as the last data frame's write-to-wire, so we + // spawn a goroutine to wait on all such buffers without blocking. + if as.returnBuffers != nil { + go callReturnBuffers(as.returnBuffers) + } + as.mu.Lock() if as.finished { as.mu.Unlock() @@ -1347,6 +1425,9 @@ type serverStream struct { serverHeaderBinlogged bool mu sync.Mutex // protects trInfo.tr after the service handler runs. + + returnBuffers []*returnBuffer + attemptBufferReuse bool } func (ss *serverStream) Context() context.Context { @@ -1408,17 +1489,27 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { } }() - // load hdr, payload, data - hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) + // load hdr, payload, returnBuffer, data + hdr, payload, data, f, err := prepareMsg(m, ss.codec, ss.cp, ss.comp, ss.attemptBufferReuse) if err != nil { return err } + var rb *returnBuffer + var wg *sync.WaitGroup + if f != nil { + // We can assume mutual exclusion on this slice as only one SendMsg is + // supported concurrently. + wg = &sync.WaitGroup{} + rb = &returnBuffer{f: f, wg: wg} + ss.returnBuffers = append(ss.returnBuffers, rb) + } + // TODO(dfawley): should we be checking len(data) instead? if len(payload) > ss.maxSendMessageSize { return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) } - if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { + if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false, ReturnBufferWaitGroup: wg}); err != nil { return toRPCErr(err) } if ss.binlog != nil { @@ -1507,23 +1598,44 @@ func MethodFromServerStream(stream ServerStream) (string, bool) { return Method(stream.Context()) } +// Threshold beyond which buffer reuse should apply. +// +// TODO(adtac): make this an option in the future so that the user can +// configure it per-RPC or even per-message? +const bufferReuseThreshold = 1024 + // prepareMsg returns the hdr, payload and data // using the compressors passed or using the // passed preparedmsg -func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { +func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor, attemptBufferReuse bool) (hdr, payload, data []byte, returnBuffer func(), err error) { if preparedMsg, ok := m.(*PreparedMsg); ok { - return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil + f := preparedMsg.returnBuffer + if !attemptBufferReuse { + f = nil + } + return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, f, nil } + // The input interface is not a prepared msg. // Marshal and Compress the data at this point data, err = encode(codec, m) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } + + if attemptBufferReuse && len(data) >= bufferReuseThreshold { + if bcodec, ok := codec.(bufferedBaseCodec); ok { + returnBuffer = func() { + bcodec.ReturnBuffer(data) + } + } + } + compData, err := compress(data, cp, comp) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } + hdr, payload = msgHeader(data, compData) - return hdr, payload, data, nil + return hdr, payload, data, returnBuffer, nil } diff --git a/vet.sh b/vet.sh index 88825e9d07de..77e7f213f061 100755 --- a/vet.sh +++ b/vet.sh @@ -119,7 +119,7 @@ fi SC_OUT="$(mktemp)" staticcheck -go 1.9 -checks 'inherit,-ST1015' ./... > "${SC_OUT}" || true # Error if anything other than deprecation warnings are printed. -(! grep -v "is deprecated:.*SA1019" "${SC_OUT}") +(! grep -Pv "\((SA1019)|(SA6002)\)$" "${SC_OUT}") # Only ignore the following deprecated types/fields/functions. (! grep -Fv '.HandleResolvedAddrs .HandleSubConnStateChange @@ -152,5 +152,6 @@ naming.Resolver naming.Update naming.Watcher resolver.Backend -resolver.GRPCLB' "${SC_OUT}" +resolver.GRPCLB +SA6002' "${SC_OUT}" )