From 028f5ea49ec455aad451d280638f0f66a9667b8d Mon Sep 17 00:00:00 2001 From: bufdev Date: Thu, 7 Apr 2022 16:34:23 -0400 Subject: [PATCH 01/10] Add bufferPool --- client.go | 3 +++ handler.go | 3 +++ header.go | 21 ++++++++------- protocol.go | 3 ++- protocol_grpc.go | 7 +++++ protocol_grpc_client_stream.go | 10 +++++--- protocol_grpc_handler_stream.go | 15 +++++++---- protocol_grpc_handler_stream_test.go | 15 ++++++----- protocol_grpc_lpm.go | 38 ++++++++++++++-------------- protocol_grpc_util.go | 4 +-- 10 files changed, 71 insertions(+), 48 deletions(-) diff --git a/client.go b/client.go index 59d32ac8..5f84a76d 100644 --- a/client.go +++ b/client.go @@ -54,6 +54,7 @@ func NewClient[Req, Res any]( CompressMinBytes: config.CompressMinBytes, HTTPClient: httpClient, URL: url, + BufferPool: config.BufferPool, }) if protocolErr != nil { return nil, protocolErr @@ -170,6 +171,7 @@ type clientConfiguration struct { CompressionPools map[string]*compressionPool Codec Codec RequestCompressionName string + BufferPool *bufferPool } func newClientConfiguration(url string, options []ClientOption) (*clientConfiguration, *Error) { @@ -177,6 +179,7 @@ func newClientConfiguration(url string, options []ClientOption) (*clientConfigur config := clientConfiguration{ Procedure: protoPath, CompressionPools: make(map[string]*compressionPool), + BufferPool: newBufferPool(), } WithProtoBinaryCodec().applyToClient(&config) WithGzip().applyToClient(&config) diff --git a/handler.go b/handler.go index 29bddea9..607ff49e 100644 --- a/handler.go +++ b/handler.go @@ -249,6 +249,7 @@ type handlerConfiguration struct { Procedure string HandleGRPC bool HandleGRPCWeb bool + BufferPool *bufferPool } func newHandlerConfiguration(procedure string, options []HandlerOption) *handlerConfiguration { @@ -259,6 +260,7 @@ func newHandlerConfiguration(procedure string, options []HandlerOption) *handler Codecs: make(map[string]Codec), HandleGRPC: true, HandleGRPCWeb: true, + BufferPool: newBufferPool(), } WithProtoBinaryCodec().applyToHandler(&config) WithProtoJSONCodec().applyToHandler(&config) @@ -293,6 +295,7 @@ func (c *handlerConfiguration) newProtocolHandlers(streamType StreamType) []prot Codecs: codecs, CompressionPools: compressors, CompressMinBytes: c.CompressMinBytes, + BufferPool: c.BufferPool, })) } return handlers diff --git a/header.go b/header.go index 33cfab1a..76b43a97 100644 --- a/header.go +++ b/header.go @@ -15,7 +15,6 @@ package connect import ( - "bytes" "encoding/base64" "fmt" "net/http" @@ -58,12 +57,12 @@ func DecodeBinaryHeader(data string) ([]byte, error) { // References: // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses // https://datatracker.ietf.org/doc/html/rfc3986#section-2.1 -func percentEncode(msg string) string { +func percentEncode(bufferPool *bufferPool, msg string) string { for i := 0; i < len(msg); i++ { // Characters that need to be escaped are defined in gRPC's HTTP/2 spec. // They're different from the generic set defined in RFC 3986. if c := msg[i]; c < ' ' || c > '~' || c == '%' { - return percentEncodeSlow(msg, i) + return percentEncodeSlow(bufferPool, msg, i) } } return msg @@ -71,9 +70,9 @@ func percentEncode(msg string) string { // msg needs some percent-escaping. Bytes before offset don't require // percent-encoding, so they can be copied to the output as-is. -func percentEncodeSlow(msg string, offset int) string { - // OPT: easy opportunity to pool buffers - out := bytes.NewBuffer(make([]byte, 0, len(msg))) +func percentEncodeSlow(bufferPool *bufferPool, msg string, offset int) string { + out := bufferPool.Get() + defer bufferPool.Put(out) out.WriteString(msg[:offset]) for i := offset; i < len(msg); i++ { c := msg[i] @@ -86,10 +85,10 @@ func percentEncodeSlow(msg string, offset int) string { return out.String() } -func percentDecode(encoded string) string { +func percentDecode(bufferPool *bufferPool, encoded string) string { for i := 0; i < len(encoded); i++ { if c := encoded[i]; c == '%' && i+2 < len(encoded) { - return percentDecodeSlow(encoded, i) + return percentDecodeSlow(bufferPool, encoded, i) } } return encoded @@ -97,9 +96,9 @@ func percentDecode(encoded string) string { // Similar to percentEncodeSlow: encoded is percent-encoded, and needs to be // decoded byte-by-byte starting at offset. -func percentDecodeSlow(encoded string, offset int) string { - // OPT: easy opportunity to pool buffers - out := bytes.NewBuffer(make([]byte, 0, len(encoded))) +func percentDecodeSlow(bufferPool *bufferPool, encoded string, offset int) string { + out := bufferPool.Get() + defer bufferPool.Put(out) out.WriteString(encoded[:offset]) for i := offset; i < len(encoded); i++ { c := encoded[i] diff --git a/protocol.go b/protocol.go index 15a19171..cbb65200 100644 --- a/protocol.go +++ b/protocol.go @@ -49,6 +49,7 @@ type protocolHandlerParams struct { Codecs readOnlyCodecs CompressionPools readOnlyCompressionPools CompressMinBytes int + BufferPool *bufferPool } // Handler is the server side of a protocol. HTTP handlers typically support @@ -93,7 +94,7 @@ type protocolClientParams struct { CompressMinBytes int HTTPClient HTTPClient URL string - + BufferPool *bufferPool // The gRPC family of protocols always needs access to a Protobuf codec to // marshal and unmarshal errors. Protobuf Codec diff --git a/protocol_grpc.go b/protocol_grpc.go index 18d9d2fb..296cbc5d 100644 --- a/protocol_grpc.go +++ b/protocol_grpc.go @@ -37,6 +37,7 @@ func (g *protocolGRPC) NewHandler(params *protocolHandlerParams) protocolHandler compressionPools: params.CompressionPools, minCompressBytes: params.CompressMinBytes, accept: acceptPostValue(g.web, params.Codecs), + bufferPool: params.BufferPool, } } @@ -59,6 +60,7 @@ func (g *protocolGRPC) NewClient(params *protocolClientParams) (protocolClient, minCompressBytes: params.CompressMinBytes, httpClient: params.HTTPClient, procedureURL: params.URL, + bufferPool: params.BufferPool, }, nil } @@ -69,6 +71,7 @@ type grpcHandler struct { compressionPools readOnlyCompressionPools minCompressBytes int accept string + bufferPool *bufferPool } func (g *grpcHandler) ShouldHandleMethod(method string) bool { @@ -173,6 +176,7 @@ func (g *grpcHandler) NewStream( g.codecs.Protobuf(), // for errors g.compressionPools.Get(requestCompression), g.compressionPools.Get(responseCompression), + g.bufferPool, )) // We can't return failed as-is: a nil *Error is non-nil when returned as an // error interface. @@ -212,6 +216,7 @@ type grpcClient struct { httpClient HTTPClient procedureURL string wrapErrorInterceptor Interceptor + bufferPool *bufferPool } func (g *grpcClient) WriteRequestHeader(header http.Header) { @@ -262,6 +267,7 @@ func (g *grpcClient) NewStream( compressionPool: g.compressionPools.Get(g.compressionName), codec: g.codec, compressMinBytes: g.minCompressBytes, + bufferPool: g.bufferPool, }, header: header, trailer: make(http.Header), @@ -270,6 +276,7 @@ func (g *grpcClient) NewStream( responseHeader: make(http.Header), responseTrailer: make(http.Header), compressionPools: g.compressionPools, + bufferPool: g.bufferPool, responseReady: make(chan struct{}), } return g.wrapStream(&clientSender{duplex}, &clientReceiver{duplex}) diff --git a/protocol_grpc_client_stream.go b/protocol_grpc_client_stream.go index 2f132517..03e50a21 100644 --- a/protocol_grpc_client_stream.go +++ b/protocol_grpc_client_stream.go @@ -88,6 +88,7 @@ type duplexClientStream struct { responseReady chan struct{} unmarshaler unmarshaler compressionPools readOnlyCompressionPools + bufferPool *bufferPool errMu sync.Mutex requestErr error @@ -185,7 +186,7 @@ func (cs *duplexClientStream) Receive(message any) error { if errors.Is(err, errGotWebTrailers) { mergeHeaders(cs.responseTrailer, cs.unmarshaler.WebTrailer()) } - if serverErr := extractError(cs.protobuf, cs.responseTrailer); serverErr != nil { + if serverErr := extractError(cs.bufferPool, cs.protobuf, cs.responseTrailer); serverErr != nil { // This is expected from a protocol perspective, but receiving trailers // means that we're _not_ getting a message. For users to realize that // the stream has ended, Receive must return an error. @@ -312,7 +313,7 @@ func (cs *duplexClientStream) makeRequest(prepared chan struct{}) { // DATA frames have been sent on the stream - isn't standard HTTP/2 // semantics, so net/http doesn't know anything about it. To us, then, these // trailers-only responses actually appear as headers-only responses. - if err := extractError(cs.protobuf, res.Header); err != nil { + if err := extractError(cs.bufferPool, cs.protobuf, res.Header); err != nil { // Per the specification, only the HTTP status code and Content-Type should // be treated as headers. The rest should be treated as trailing metadata. if contentType := res.Header.Get("Content-Type"); contentType != "" { @@ -338,6 +339,7 @@ func (cs *duplexClientStream) makeRequest(prepared chan struct{}) { codec: cs.codec, compressionPool: cs.compressionPools.Get(compression), web: cs.web, + bufferPool: cs.bufferPool, } } @@ -395,7 +397,7 @@ func (cs *duplexClientStream) getRequestOrResponseError() error { // binary Protobuf format, even if the messages in the request/response stream // use a different codec. Consequently, this function needs a Protobuf codec to // unmarshal error information in the headers. -func extractError(protobuf Codec, trailer http.Header) *Error { +func extractError(bufferPool *bufferPool, protobuf Codec, trailer http.Header) *Error { codeHeader := trailer.Get("Grpc-Status") if codeHeader == "" || codeHeader == "0" { return nil @@ -405,7 +407,7 @@ func extractError(protobuf Codec, trailer http.Header) *Error { if err != nil { return errorf(CodeUnknown, "gRPC protocol error: got invalid error code %q", codeHeader) } - message := percentDecode(trailer.Get("Grpc-Message")) + message := percentDecode(bufferPool, trailer.Get("Grpc-Message")) retErr := NewError(Code(code), errors.New(message)) detailsBinaryEncoded := trailer.Get("Grpc-Status-Details-Bin") diff --git a/protocol_grpc_handler_stream.go b/protocol_grpc_handler_stream.go index 4aa5eee0..3616c942 100644 --- a/protocol_grpc_handler_stream.go +++ b/protocol_grpc_handler_stream.go @@ -32,6 +32,7 @@ func newHandlerStream( protobuf Codec, // for errors requestCompressionPools *compressionPool, responseCompressionPools *compressionPool, + bufferPool *bufferPool, ) (*handlerSender, *handlerReceiver) { sender := &handlerSender{ spec: spec, @@ -41,11 +42,13 @@ func newHandlerStream( compressionPool: responseCompressionPools, codec: codec, compressMinBytes: compressMinBytes, + bufferPool: bufferPool, }, - protobuf: protobuf, - writer: responseWriter, - header: make(http.Header), - trailer: make(http.Header), + protobuf: protobuf, + writer: responseWriter, + header: make(http.Header), + trailer: make(http.Header), + bufferPool: bufferPool, } receiver := &handlerReceiver{ spec: spec, @@ -54,6 +57,7 @@ func newHandlerStream( reader: request.Body, compressionPool: requestCompressionPools, codec: codec, + bufferPool: bufferPool, }, request: request, } @@ -69,6 +73,7 @@ type handlerSender struct { header http.Header trailer http.Header wroteToBody bool + bufferPool *bufferPool } var _ Sender = (*handlerSender)(nil) @@ -97,7 +102,7 @@ func (hs *handlerSender) Close(err error) error { // mutate the trailers map that the user sees. mergedTrailers := make(http.Header, len(hs.trailer)+2) // always make space for status & message mergeHeaders(mergedTrailers, hs.trailer) - if marshalErr := grpcErrorToTrailer(mergedTrailers, hs.protobuf, err); marshalErr != nil { + if marshalErr := grpcErrorToTrailer(hs.bufferPool, mergedTrailers, hs.protobuf, err); marshalErr != nil { return marshalErr } if hs.web && !hs.wroteToBody { diff --git a/protocol_grpc_handler_stream_test.go b/protocol_grpc_handler_stream_test.go index 38ba826e..238d7b84 100644 --- a/protocol_grpc_handler_stream_test.go +++ b/protocol_grpc_handler_stream_test.go @@ -28,16 +28,19 @@ func TestGRPCHandlerSender(t *testing.T) { newSender := func(web bool) *handlerSender { responseWriter := httptest.NewRecorder() protobufCodec := &protoBinaryCodec{} + bufferPool := newBufferPool() return &handlerSender{ web: web, marshaler: marshaler{ - writer: responseWriter, - codec: protobufCodec, + writer: responseWriter, + codec: protobufCodec, + bufferPool: bufferPool, }, - protobuf: protobufCodec, - writer: responseWriter, - header: make(http.Header), - trailer: make(http.Header), + protobuf: protobufCodec, + writer: responseWriter, + header: make(http.Header), + trailer: make(http.Header), + bufferPool: bufferPool, } } t.Run("web", func(t *testing.T) { diff --git a/protocol_grpc_lpm.go b/protocol_grpc_lpm.go index d59ff99b..e8fd892a 100644 --- a/protocol_grpc_lpm.go +++ b/protocol_grpc_lpm.go @@ -16,7 +16,6 @@ package connect import ( "bufio" - "bytes" "encoding/binary" "errors" "io" @@ -41,6 +40,7 @@ type marshaler struct { compressionPool *compressionPool codec Codec compressMinBytes int + bufferPool *bufferPool } func (m *marshaler) Marshal(message any) *Error { @@ -52,8 +52,8 @@ func (m *marshaler) Marshal(message any) *Error { } func (m *marshaler) MarshalWebTrailers(trailer http.Header) *Error { - // OPT: easy opportunity to pool buffers - raw := bytes.NewBuffer(nil) + raw := m.bufferPool.Get() + defer m.bufferPool.Put(raw) if err := trailer.Write(raw); err != nil { return errorf(CodeInternal, "couldn't format trailers: %w", err) } @@ -70,8 +70,8 @@ func (m *marshaler) writeLPM(trailer bool, message []byte) *Error { } return nil } - // OPT: easy opportunity to pool buffers - data := bytes.NewBuffer(make([]byte, 0, len(message))) + data := m.bufferPool.Get() + defer m.bufferPool.Put(data) compressor, err := m.compressionPool.GetCompressor(data) if err != nil { return errorf(CodeUnknown, "get compressor: %w", err) @@ -121,9 +121,9 @@ type unmarshaler struct { reader io.Reader codec Codec compressionPool *compressionPool - - web bool - webTrailer http.Header + bufferPool *bufferPool + web bool + webTrailer http.Header } func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { @@ -169,8 +169,8 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { if size < 0 { return errorf(CodeInvalidArgument, "message size %d overflowed uint32", size) } - // OPT: easy opportunity to pool buffers and grab the underlying byte slice - raw := make([]byte, size) + raw := u.bufferPool.Get() + defer u.bufferPool.Put(raw) if size > 0 { // At layer 7, we don't know exactly what's happening down in L4. Large // length-prefixed messages may arrive in chunks, so we may need to read @@ -178,7 +178,7 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { // forever if the LPM is malformed. remaining := size for remaining > 0 { - bytesRead, err := u.reader.Read(raw[size-remaining : size]) + bytesRead, err := u.reader.Read(raw.Bytes()[size-remaining : size]) if err != nil && !errors.Is(err, io.EOF) { return errorf(CodeUnknown, "error reading length-prefixed message data: %w", err) } @@ -204,12 +204,12 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { } if size > 0 && compressed { - decompressor, err := u.compressionPool.GetDecompressor(bytes.NewReader(raw)) + decompressor, err := u.compressionPool.GetDecompressor(raw) if err != nil { return errorf(CodeInvalidArgument, "can't decompress: %w", err) } - // OPT: easy opportunity to pool buffers - decompressed := bytes.NewBuffer(make([]byte, 0, len(raw))) + decompressed := u.bufferPool.Get() + defer u.bufferPool.Put(decompressed) if _, err := decompressed.ReadFrom(decompressor); err != nil { _ = u.compressionPool.PutDecompressor(decompressor) return errorf(CodeInvalidArgument, "can't decompress: %w", err) @@ -217,22 +217,22 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { if err := u.compressionPool.PutDecompressor(decompressor); err != nil { return errorf(CodeUnknown, "recycle decompressor: %w", err) } - raw = decompressed.Bytes() + raw = decompressed } if isWebTrailer { // Per the gRPC-Web specification, trailers should be encoded as an HTTP/1 // headers block _without_ the terminating newline. To make the headers // parseable by net/textproto, we need to add the newline. - raw = append(raw, '\n') // nolint:makezero - bufferedReader := bufio.NewReader(bytes.NewReader(raw)) + raw.WriteRune('\n') + bufferedReader := bufio.NewReader(raw) mimeReader := textproto.NewReader(bufferedReader) mimeHeader, err := mimeReader.ReadMIMEHeader() if err != nil { return errorf( CodeInvalidArgument, "gRPC-Web protocol error: received invalid trailers %q: %w", - string(raw), + raw.String(), err, ) } @@ -240,7 +240,7 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { return errGotWebTrailers } - if err := u.codec.Unmarshal(raw, message); err != nil { + if err := u.codec.Unmarshal(raw.Bytes(), message); err != nil { return errorf(CodeInvalidArgument, "can't unmarshal into %T: %w", message, err) } diff --git a/protocol_grpc_util.go b/protocol_grpc_util.go index 8de50452..ad4e5069 100644 --- a/protocol_grpc_util.go +++ b/protocol_grpc_util.go @@ -86,7 +86,7 @@ func contentTypeFromCodecName(web bool, name string) string { return typeDefaultGRPCPrefix + name } -func grpcErrorToTrailer(trailer http.Header, protobuf Codec, err error) error { +func grpcErrorToTrailer(bufferPool *bufferPool, trailer http.Header, protobuf Codec, err error) error { const ( statusKey = "Grpc-Status" messageKey = "Grpc-Message" @@ -110,7 +110,7 @@ func grpcErrorToTrailer(trailer http.Header, protobuf Codec, err error) error { return errorf(CodeInternal, "couldn't marshal protobuf status: %w", err) } trailer.Set(statusKey, code) - trailer.Set(messageKey, percentEncode(status.Message)) + trailer.Set(messageKey, percentEncode(bufferPool, status.Message)) trailer.Set(detailsKey, EncodeBinaryHeader(bin)) return nil } From c33bbce2a27d2150e54870acac7b77512a21fa67 Mon Sep 17 00:00:00 2001 From: bufdev Date: Fri, 8 Apr 2022 12:03:32 -0400 Subject: [PATCH 02/10] Actually add buffer_pool.go --- buffer_pool.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 buffer_pool.go diff --git a/buffer_pool.go b/buffer_pool.go new file mode 100644 index 00000000..8570a0ee --- /dev/null +++ b/buffer_pool.go @@ -0,0 +1,51 @@ +// Copyright 2021-2022 Buf Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connect + +import ( + "bytes" + "sync" +) + +const ( + initialBufferSize = 512 + maxRecycleBufferSize = 1024 * 1024 // if >1MiB, don't hold onto a buffer +) + +type bufferPool struct { + sync.Pool +} + +func newBufferPool() *bufferPool { + return &bufferPool{ + Pool: sync.Pool{ + New: func() any { + return bytes.NewBuffer(make([]byte, 0, initialBufferSize)) + }, + }, + } +} + +func (b *bufferPool) Get() *bytes.Buffer { + return b.Pool.Get().(*bytes.Buffer) +} + +func (b *bufferPool) Put(buffer *bytes.Buffer) { + if buffer.Cap() > maxRecycleBufferSize { + return + } + buffer.Reset() + b.Pool.Put(buffer) +} From bd38b62f27dad7f024b74754c364fc3d81f390e3 Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 16:39:14 -0700 Subject: [PATCH 03/10] Hold onto buffers more aggressively --- buffer_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buffer_pool.go b/buffer_pool.go index 8570a0ee..4fa00b9b 100644 --- a/buffer_pool.go +++ b/buffer_pool.go @@ -21,7 +21,7 @@ import ( const ( initialBufferSize = 512 - maxRecycleBufferSize = 1024 * 1024 // if >1MiB, don't hold onto a buffer + maxRecycleBufferSize = 8 * 1024 * 1024 // if >8MiB, don't hold onto a buffer ) type bufferPool struct { From 7a4911e365e8a329593dc5e099c859e6b21aa614 Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 16:39:50 -0700 Subject: [PATCH 04/10] Always comma-ok type assertions --- buffer_pool.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/buffer_pool.go b/buffer_pool.go index 4fa00b9b..13f8a9e8 100644 --- a/buffer_pool.go +++ b/buffer_pool.go @@ -39,7 +39,10 @@ func newBufferPool() *bufferPool { } func (b *bufferPool) Get() *bytes.Buffer { - return b.Pool.Get().(*bytes.Buffer) + if buf, ok := b.Pool.Get().(*bytes.Buffer); ok { + return buf + } + return bytes.NewBuffer(make([]byte, 0, initialBufferSize)) } func (b *bufferPool) Put(buffer *bytes.Buffer) { From edfbf47b550a69ccbf7d2c9a4b280309e2ed1376 Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 16:40:13 -0700 Subject: [PATCH 05/10] Supply pools in header tests --- header_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/header_test.go b/header_test.go index 559609e4..89ce25cd 100644 --- a/header_test.go +++ b/header_test.go @@ -42,12 +42,13 @@ func TestBinaryEncodingQuick(t *testing.T) { func TestPercentEncodingQuick(t *testing.T) { t.Parallel() + pool := newBufferPool() roundtrip := func(input string) bool { if !utf8.ValidString(input) { return true } - encoded := percentEncode(input) - decoded := percentDecode(encoded) + encoded := percentEncode(pool, input) + decoded := percentDecode(pool, encoded) return decoded == input } if err := quick.Check(roundtrip, nil /* config */); err != nil { @@ -57,11 +58,12 @@ func TestPercentEncodingQuick(t *testing.T) { func TestPercentEncoding(t *testing.T) { t.Parallel() + pool := newBufferPool() roundtrip := func(input string) { assert.True(t, utf8.ValidString(input), assert.Sprintf("input invalid UTF-8")) - encoded := percentEncode(input) + encoded := percentEncode(pool, input) t.Logf("%q encoded as %q", input, encoded) - decoded := percentDecode(encoded) + decoded := percentDecode(pool, encoded) assert.Equal(t, decoded, input) } From ac70f68a9f2346fadc05926f2b132f5b497c3479 Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 16:50:43 -0700 Subject: [PATCH 06/10] Finish pooling buffers when marshaling When we marshal length-prefixed messages, it's more efficient and a little simpler if we always deal in buffers instead of slices. --- protocol_grpc_lpm.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/protocol_grpc_lpm.go b/protocol_grpc_lpm.go index e8fd892a..a17cea19 100644 --- a/protocol_grpc_lpm.go +++ b/protocol_grpc_lpm.go @@ -16,6 +16,7 @@ package connect import ( "bufio" + "bytes" "encoding/binary" "errors" "io" @@ -48,7 +49,11 @@ func (m *marshaler) Marshal(message any) *Error { if err != nil { return errorf(CodeInternal, "couldn't marshal message: %w", err) } - return m.writeLPM(false /* trailer */, raw) + // We can't avoid allocating the byte slice, so we may as well reuse it once + // we're done with it. + buffer := bytes.NewBuffer(raw) + defer m.bufferPool.Put(buffer) + return m.writeLPM(false /* trailer */, buffer) } func (m *marshaler) MarshalWebTrailers(trailer http.Header) *Error { @@ -57,15 +62,18 @@ func (m *marshaler) MarshalWebTrailers(trailer http.Header) *Error { if err := trailer.Write(raw); err != nil { return errorf(CodeInternal, "couldn't format trailers: %w", err) } - return m.writeLPM(true /* trailer */, raw.Bytes()) + return m.writeLPM(true /* trailer */, raw) } -func (m *marshaler) writeLPM(trailer bool, message []byte) *Error { - if m.compressionPool == nil || len(message) < m.compressMinBytes { - if err := m.writeGRPCPrefix(false /* compressed */, trailer, len(message)); err != nil { +// writeLPM writes the message as a gRPC length-prefixed message, compressing +// as necessary. It doesn't retain any references to the supplied buffer or +// its underlying data. +func (m *marshaler) writeLPM(trailer bool, message *bytes.Buffer) *Error { + if m.compressionPool == nil || message.Len() < m.compressMinBytes { + if err := m.writeGRPCPrefix(false /* compressed */, trailer, message.Len()); err != nil { return err // already enriched } - if _, err := m.writer.Write(message); err != nil { + if _, err := io.Copy(m.writer, message); err != nil { return errorf(CodeUnknown, "couldn't write message of length-prefixed message: %w", err) } return nil @@ -77,7 +85,7 @@ func (m *marshaler) writeLPM(trailer bool, message []byte) *Error { return errorf(CodeUnknown, "get compressor: %w", err) } - if _, err := compressor.Write(message); err != nil { // returns uncompressed size, which isn't useful + if _, err := io.Copy(compressor, message); err != nil { // returns uncompressed size, which isn't useful _ = m.compressionPool.PutCompressor(compressor) return errorf(CodeInternal, "couldn't compress data: %w", err) } From 8016997fc0eb99eb99bd7736b4da4db401513215 Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 16:53:40 -0700 Subject: [PATCH 07/10] Fix pooling when unmarshaling length-prefixed messages When we want to use a pooled buffer as a byte slice, we have to first call `Grow` (to ensure adequate capacity) and then reslice to set `len(raw)` to `cap(raw)`. --- protocol_grpc_lpm.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/protocol_grpc_lpm.go b/protocol_grpc_lpm.go index a17cea19..26148070 100644 --- a/protocol_grpc_lpm.go +++ b/protocol_grpc_lpm.go @@ -177,8 +177,10 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { if size < 0 { return errorf(CodeInvalidArgument, "message size %d overflowed uint32", size) } - raw := u.bufferPool.Get() - defer u.bufferPool.Put(raw) + rawBuffer := u.bufferPool.Get() + defer u.bufferPool.Put(rawBuffer) + rawBuffer.Grow(size) + raw := rawBuffer.Bytes()[0:size] if size > 0 { // At layer 7, we don't know exactly what's happening down in L4. Large // length-prefixed messages may arrive in chunks, so we may need to read @@ -186,7 +188,7 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { // forever if the LPM is malformed. remaining := size for remaining > 0 { - bytesRead, err := u.reader.Read(raw.Bytes()[size-remaining : size]) + bytesRead, err := u.reader.Read(raw[size-remaining : size]) if err != nil && !errors.Is(err, io.EOF) { return errorf(CodeUnknown, "error reading length-prefixed message data: %w", err) } @@ -212,7 +214,7 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { } if size > 0 && compressed { - decompressor, err := u.compressionPool.GetDecompressor(raw) + decompressor, err := u.compressionPool.GetDecompressor(bytes.NewReader(raw)) if err != nil { return errorf(CodeInvalidArgument, "can't decompress: %w", err) } @@ -225,22 +227,22 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { if err := u.compressionPool.PutDecompressor(decompressor); err != nil { return errorf(CodeUnknown, "recycle decompressor: %w", err) } - raw = decompressed + raw = decompressed.Bytes() } if isWebTrailer { // Per the gRPC-Web specification, trailers should be encoded as an HTTP/1 // headers block _without_ the terminating newline. To make the headers // parseable by net/textproto, we need to add the newline. - raw.WriteRune('\n') - bufferedReader := bufio.NewReader(raw) + raw = append(raw, '\n') + bufferedReader := bufio.NewReader(bytes.NewReader(raw)) mimeReader := textproto.NewReader(bufferedReader) mimeHeader, err := mimeReader.ReadMIMEHeader() if err != nil { return errorf( CodeInvalidArgument, "gRPC-Web protocol error: received invalid trailers %q: %w", - raw.String(), + string(raw), err, ) } @@ -248,7 +250,7 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { return errGotWebTrailers } - if err := u.codec.Unmarshal(raw.Bytes(), message); err != nil { + if err := u.codec.Unmarshal(raw, message); err != nil { return errorf(CodeInvalidArgument, "can't unmarshal into %T: %w", message, err) } From d429555ce0e68d0bcde22bd74781bbbeb19523cc Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 20:42:29 -0700 Subject: [PATCH 08/10] Make benchmark use more realistic size Currently, the benchmark is a bit unrealistic - the wire size of the payload is single-digit bytes. This makes the payload slightly bigger, though it still compresses unrealistically well. --- bench_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/bench_test.go b/bench_test.go index d9de3d55..8adbed9e 100644 --- a/bench_test.go +++ b/bench_test.go @@ -56,6 +56,7 @@ func BenchmarkConnect(b *testing.B) { connect.WithGzipRequests(), ) assert.Nil(b, err) + twoMiB := strings.Repeat("a", 2*1024*1024) b.ResetTimer() b.Run("unary", func(b *testing.B) { @@ -63,7 +64,7 @@ func BenchmarkConnect(b *testing.B) { for pb.Next() { _, _ = client.Ping( context.Background(), - connect.NewRequest(&pingv1.PingRequest{Number: 42}), + connect.NewRequest(&pingv1.PingRequest{Text: twoMiB}), ) } }) @@ -71,7 +72,7 @@ func BenchmarkConnect(b *testing.B) { } type ping struct { - Number int `json:"number"` + Text string `json:"text"` } func BenchmarkREST(b *testing.B) { @@ -118,23 +119,24 @@ func BenchmarkREST(b *testing.B) { server.EnableHTTP2 = true server.StartTLS() defer server.Close() + twoMiB := strings.Repeat("a", 2*1024*1024) b.ResetTimer() b.Run("unary", func(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - unaryRESTIteration(b, server.Client(), server.URL) + unaryRESTIteration(b, server.Client(), server.URL, twoMiB) } }) }) } -func unaryRESTIteration(b *testing.B, client *http.Client, url string) { +func unaryRESTIteration(b *testing.B, client *http.Client, url string, text string) { b.Helper() rawRequestBody := bytes.NewBuffer(nil) compressedRequestBody := gzip.NewWriter(rawRequestBody) encoder := json.NewEncoder(compressedRequestBody) - if err := encoder.Encode(&ping{42}); err != nil { + if err := encoder.Encode(&ping{text}); err != nil { b.Fatalf("marshal request: %v", err) } compressedRequestBody.Close() From 554965058ec7b61411d91fff1022c21c2cf34a45 Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 21:59:56 -0700 Subject: [PATCH 09/10] Zero pooled slice --- protocol_grpc_lpm.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocol_grpc_lpm.go b/protocol_grpc_lpm.go index 26148070..e73d5ecb 100644 --- a/protocol_grpc_lpm.go +++ b/protocol_grpc_lpm.go @@ -181,6 +181,11 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { defer u.bufferPool.Put(rawBuffer) rawBuffer.Grow(size) raw := rawBuffer.Bytes()[0:size] + // We're careful to read fill this slice, but zero'ing it is a nice extra + // layer of safety. + for i := range raw { + raw[i] = 0 + } if size > 0 { // At layer 7, we don't know exactly what's happening down in L4. Large // length-prefixed messages may arrive in chunks, so we may need to read From d838c549d7eb96c76c329ded6cd5903ce6099bf2 Mon Sep 17 00:00:00 2001 From: bufdev Date: Sat, 9 Apr 2022 11:05:33 -0400 Subject: [PATCH 10/10] Add comment about zeroing --- protocol_grpc_lpm.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/protocol_grpc_lpm.go b/protocol_grpc_lpm.go index e73d5ecb..a7e9afaa 100644 --- a/protocol_grpc_lpm.go +++ b/protocol_grpc_lpm.go @@ -183,6 +183,8 @@ func (u *unmarshaler) Unmarshal(message any) (retErr *Error) { raw := rawBuffer.Bytes()[0:size] // We're careful to read fill this slice, but zero'ing it is a nice extra // layer of safety. + // Do not change zero'ing mechanism: this form is specially recognized + // by the compiler per https://golang.org/cl/137880043. for i := range raw { raw[i] = 0 }