Skip to content
This repository has been archived by the owner on Sep 1, 2023. It is now read-only.

Commit

Permalink
Finish pooling buffers when marshaling
Browse files Browse the repository at this point in the history
When we marshal length-prefixed messages, it's more efficient and a
little simpler if we always deal in buffers instead of slices.
  • Loading branch information
akshayjshah committed Apr 8, 2022
1 parent 5c6b742 commit fccf9fd
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions protocol_grpc_lpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package connect

import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"io"
Expand Down Expand Up @@ -48,7 +49,11 @@ func (m *marshaler) Marshal(message any) *Error {
if err != nil {
return errorf(CodeInternal, "couldn't marshal message: %w", err)
}
return m.writeLPM(false /* trailer */, raw)
// We can't avoid allocating the byte slice, so we may as well reuse it once
// we're done with it.
buffer := bytes.NewBuffer(raw)
defer m.bufferPool.Put(buffer)
return m.writeLPM(false /* trailer */, buffer)
}

func (m *marshaler) MarshalWebTrailers(trailer http.Header) *Error {
Expand All @@ -57,15 +62,18 @@ func (m *marshaler) MarshalWebTrailers(trailer http.Header) *Error {
if err := trailer.Write(raw); err != nil {
return errorf(CodeInternal, "couldn't format trailers: %w", err)
}
return m.writeLPM(true /* trailer */, raw.Bytes())
return m.writeLPM(true /* trailer */, raw)
}

func (m *marshaler) writeLPM(trailer bool, message []byte) *Error {
if m.compressionPool == nil || len(message) < m.compressMinBytes {
if err := m.writeGRPCPrefix(false /* compressed */, trailer, len(message)); err != nil {
// writeLPM writes the message as a gRPC length-prefixed message, compressing
// as necessary. It doesn't retain any references to the supplied buffer or
// its underlying data.
func (m *marshaler) writeLPM(trailer bool, message *bytes.Buffer) *Error {
if m.compressionPool == nil || message.Len() < m.compressMinBytes {
if err := m.writeGRPCPrefix(false /* compressed */, trailer, message.Len()); err != nil {
return err // already enriched
}
if _, err := m.writer.Write(message); err != nil {
if _, err := io.Copy(m.writer, message); err != nil {
return errorf(CodeUnknown, "couldn't write message of length-prefixed message: %w", err)
}
return nil
Expand All @@ -77,7 +85,7 @@ func (m *marshaler) writeLPM(trailer bool, message []byte) *Error {
return errorf(CodeUnknown, "get compressor: %w", err)
}

if _, err := compressor.Write(message); err != nil { // returns uncompressed size, which isn't useful
if _, err := io.Copy(compressor, message); err != nil { // returns uncompressed size, which isn't useful
_ = m.compressionPool.PutCompressor(compressor)
return errorf(CodeInternal, "couldn't compress data: %w", err)
}
Expand Down

0 comments on commit fccf9fd

Please sign in to comment.