From fccf9fd128745ac042b8921c42425ea552fb0249 Mon Sep 17 00:00:00 2001 From: Akshay Shah Date: Fri, 8 Apr 2022 16:50:43 -0700 Subject: [PATCH] Finish pooling buffers when marshaling When we marshal length-prefixed messages, it's more efficient and a little simpler if we always deal in buffers instead of slices. --- protocol_grpc_lpm.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/protocol_grpc_lpm.go b/protocol_grpc_lpm.go index 3f1f16b8..b295b5bb 100644 --- a/protocol_grpc_lpm.go +++ b/protocol_grpc_lpm.go @@ -16,6 +16,7 @@ package connect import ( "bufio" + "bytes" "encoding/binary" "errors" "io" @@ -48,7 +49,11 @@ func (m *marshaler) Marshal(message any) *Error { if err != nil { return errorf(CodeInternal, "couldn't marshal message: %w", err) } - return m.writeLPM(false /* trailer */, raw) + // We can't avoid allocating the byte slice, so we may as well reuse it once + // we're done with it. + buffer := bytes.NewBuffer(raw) + defer m.bufferPool.Put(buffer) + return m.writeLPM(false /* trailer */, buffer) } func (m *marshaler) MarshalWebTrailers(trailer http.Header) *Error { @@ -57,15 +62,18 @@ func (m *marshaler) MarshalWebTrailers(trailer http.Header) *Error { if err := trailer.Write(raw); err != nil { return errorf(CodeInternal, "couldn't format trailers: %w", err) } - return m.writeLPM(true /* trailer */, raw.Bytes()) + return m.writeLPM(true /* trailer */, raw) } -func (m *marshaler) writeLPM(trailer bool, message []byte) *Error { - if m.compressionPool == nil || len(message) < m.compressMinBytes { - if err := m.writeGRPCPrefix(false /* compressed */, trailer, len(message)); err != nil { +// writeLPM writes the message as a gRPC length-prefixed message, compressing +// as necessary. It doesn't retain any references to the supplied buffer or +// its underlying data. +func (m *marshaler) writeLPM(trailer bool, message *bytes.Buffer) *Error { + if m.compressionPool == nil || message.Len() < m.compressMinBytes { + if err := m.writeGRPCPrefix(false /* compressed */, trailer, message.Len()); err != nil { return err // already enriched } - if _, err := m.writer.Write(message); err != nil { + if _, err := io.Copy(m.writer, message); err != nil { return errorf(CodeUnknown, "couldn't write message of length-prefixed message: %w", err) } return nil @@ -77,7 +85,7 @@ func (m *marshaler) writeLPM(trailer bool, message []byte) *Error { return errorf(CodeUnknown, "get compressor: %w", err) } - if _, err := compressor.Write(message); err != nil { // returns uncompressed size, which isn't useful + if _, err := io.Copy(compressor, message); err != nil { // returns uncompressed size, which isn't useful _ = m.compressionPool.PutCompressor(compressor) return errorf(CodeInternal, "couldn't compress data: %w", err) }