Skip to content

Commit

Permalink
Change the code to use msgp. Add benchmarks.
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidtw committed Feb 3, 2025
1 parent 5faa7fb commit e5b1e6a
Show file tree
Hide file tree
Showing 9 changed files with 685 additions and 123 deletions.
165 changes: 90 additions & 75 deletions format.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ package wrp

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"strings"

"github.com/ugorji/go/codec"
)

//go:generate go install golang.org/x/tools/cmd/stringer@latest
Expand All @@ -25,7 +24,9 @@ const (
Msgpack Format = iota
JSON
lastFormat
)

const (
MimeTypeMsgpack = "application/msgpack"
MimeTypeJson = "application/json"
MimeTypeOctetStream = "application/octet-stream"
Expand All @@ -40,29 +41,6 @@ func AllFormats() []Format {
return []Format{Msgpack, JSON}
}

var (
jsonHandle = codec.JsonHandle{
// TODO replace `codec.BasicHandle` since it's not meant to be used directly
// nolint:staticcheck
BasicHandle: codec.BasicHandle{
TypeInfos: codec.NewTypeInfos([]string{"json"}),
},
IntegerAsString: 'L',
}

// msgpackHandle uses the configuration required for the updated msgpack spec.
// this is what's required to ensure that the Payload field is encoded and decoded properly.
// See: http://ugorji.net/blog/go-codec-primer#format-specific-runtime-configuration
msgpackHandle = codec.MsgpackHandle{
WriteExt: true,
// TODO replace `codec.BasicHandle` since it's not meant to be used directly
// nolint:staticcheck
BasicHandle: codec.BasicHandle{
TypeInfos: codec.NewTypeInfos([]string{"json"}),
},
}
)

// ContentType returns the MIME type associated with this format
func (f Format) ContentType() string {
switch f {
Expand Down Expand Up @@ -102,84 +80,121 @@ func FormatFromContentType(contentType string, fallback ...Format) (Format, erro
return Format(-1), fmt.Errorf("invalid WRP content type: %s", contentType)
}

// handle looks up the appropriate codec.Handle for this format constant.
// This method panics if the format is not a valid value.
func (f Format) handle() codec.Handle {
// Encoder represents the underlying ugorji behavior that WRP supports
type Encoder interface {
Encode(*Message) error
}

// Decoder represents the underlying ugorji behavior that WRP supports
type Decoder interface {
Decode(*Message) error
}

// NewEncoder produces an Encoder using the appropriate WRP configuration for
// the given format.
func NewEncoder(output io.Writer, f Format) Encoder {
switch f {
case JSON:
return &jsonEncoder{enc: json.NewEncoder(output)}
case Msgpack:
return &msgpackHandle
return &msgpEncoder{stream: output}
}

return nil

Check warning on line 103 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L103

Added line #L103 was not covered by tests
}

// NewEncoderBytes produces an Encoder using the appropriate WRP configuration
// for the given format.
func NewEncoderBytes(output *[]byte, f Format) Encoder {
switch f {

Check warning on line 109 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L108-L109

Added lines #L108 - L109 were not covered by tests
case JSON:
return &jsonHandle
return &jsonEncoder{enc: json.NewEncoder(bytes.NewBuffer(*output))}
case Msgpack:
return &msgpEncoder{bits: output}

Check warning on line 113 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L111-L113

Added lines #L111 - L113 were not covered by tests
}

panic(fmt.Errorf("Invalid format constant: %d", f))
return nil

Check warning on line 116 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L116

Added line #L116 was not covered by tests
}

// EncodeListener can be implemented on any type passed to an Encoder in order
// to get notified when an encoding happens. This interface is useful to set
// mandatory fields, such as message type.
type EncodeListener interface {
BeforeEncode() error
type jsonEncoder struct {
enc *json.Encoder
}

// Encoder represents the underlying ugorji behavior that WRP supports
type Encoder interface {
Encode(interface{}) error
Reset(io.Writer)
ResetBytes(*[]byte)
func (e *jsonEncoder) Encode(msg *Message) error {
return e.enc.Encode(msg)
}

// encoderDecorator wraps a ugorji Encoder and implements the wrp.Encoder interface.
type encoderDecorator struct {
*codec.Encoder
type msgpEncoder struct {
bits *[]byte
stream io.Writer
}

// Encode checks to see if value implements EncoderTo and if it does, uses the
// value.EncodeTo() method. Otherwise, the value is passed as is to the decorated
// ugorji Encoder.
func (ed *encoderDecorator) Encode(value interface{}) error {
if listener, ok := value.(EncodeListener); ok {
if err := listener.BeforeEncode(); err != nil {
func (e *msgpEncoder) Encode(msg *Message) error {
if e.stream != nil {
got, err := msg.MarshalMsg(nil)
if err != nil {
return err
}
_, err = e.stream.Write(got)
return err
}

return ed.Encoder.Encode(value)
_, err := msg.MarshalMsg(*e.bits)
return err

Check warning on line 143 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L142-L143

Added lines #L142 - L143 were not covered by tests
}

// Decoder represents the underlying ugorji behavior that WRP supports
type Decoder interface {
Decode(interface{}) error
Reset(io.Reader)
ResetBytes([]byte)
}

// NewEncoder produces a ugorji Encoder using the appropriate WRP configuration
// NewDecoder produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewEncoder(output io.Writer, f Format) Encoder {
return &encoderDecorator{
codec.NewEncoder(output, f.handle()),
func NewDecoder(input io.Reader, f Format) Decoder {
switch f {
case JSON:
d := json.NewDecoder(input)
d.UseNumber()
return &jsonDecoder{dec: d}
case Msgpack:
return &msgpDecoder{stream: input}
}

return nil

Check warning on line 158 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L158

Added line #L158 was not covered by tests
}

// NewEncoderBytes produces a ugorji Encoder using the appropriate WRP configuration
// NewDecoderBytes produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewEncoderBytes(output *[]byte, f Format) Encoder {
return &encoderDecorator{
codec.NewEncoderBytes(output, f.handle()),
func NewDecoderBytes(input []byte, f Format) Decoder {
switch f {
case JSON:
return &jsonDecoder{dec: json.NewDecoder(bytes.NewReader(input))}

Check warning on line 166 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L165-L166

Added lines #L165 - L166 were not covered by tests
case Msgpack:
return &msgpDecoder{bits: input}
}

return nil

Check warning on line 171 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L171

Added line #L171 was not covered by tests
}

// NewDecoder produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewDecoder(input io.Reader, f Format) Decoder {
return codec.NewDecoder(input, f.handle())
type jsonDecoder struct {
dec *json.Decoder
}

// NewDecoderBytes produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewDecoderBytes(input []byte, f Format) Decoder {
return codec.NewDecoderBytes(input, f.handle())
func (d *jsonDecoder) Decode(msg *Message) error {
return d.dec.Decode(msg)
}

type msgpDecoder struct {
bits []byte
stream io.Reader
}

func (d *msgpDecoder) Decode(msg *Message) error {
var err error
if d.stream != nil {
d.bits, err = io.ReadAll(d.stream)
if err != nil {
return err
}

Check warning on line 193 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L192-L193

Added lines #L192 - L193 were not covered by tests
}
_, err = msg.UnmarshalMsg(d.bits)

return err
}

// TranscodeMessage converts a WRP message of any type from one format into another,
Expand All @@ -197,7 +212,7 @@ func TranscodeMessage(target Encoder, source Decoder) (msg *Message, err error)

// MustEncode is a convenience function that attempts to encode a given message. A panic
// is raised on any error. This function is handy for package initialization.
func MustEncode(message interface{}, f Format) []byte {
func MustEncode(message *Message, f Format) []byte {

Check warning on line 215 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L215

Added line #L215 was not covered by tests
var (
output bytes.Buffer
encoder = NewEncoder(&output, f)
Expand Down
55 changes: 18 additions & 37 deletions format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"crypto/rand"
"encoding/hex"
"fmt"
"reflect"
"testing"

// "github.com/k0kubun/pp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
// "zappem.net/pub/debug/xxd"
)

func testPayload(t *testing.T, payload []byte) {
Expand All @@ -25,15 +26,13 @@ func testPayload(t *testing.T, payload []byte) {

decoded Message

output bytes.Buffer
encoder = NewEncoder(nil, Msgpack)
decoder = NewDecoder(nil, Msgpack)
output bytes.Buffer
)

encoder.Reset(&output)
encoder := NewEncoder(&output, Msgpack)
require.NoError(encoder.Encode(&original))

decoder.Reset(&output)
decoder := NewDecoder(&output, Msgpack)
require.NoError(decoder.Decode(&decoded))

// don't output the payload if it's a ridiculous size
Expand Down Expand Up @@ -210,14 +209,6 @@ func testFormatString(t *testing.T) {
assert.NotEqual(JSON.String(), Msgpack.String())
}

func testFormatHandle(t *testing.T) {
assert := assert.New(t)

assert.NotNil(JSON.handle())
assert.NotNil(Msgpack.handle())
assert.Panics(func() { Format(999).handle() })
}

func testFormatContentType(t *testing.T) {
assert := assert.New(t)

Expand All @@ -229,23 +220,13 @@ func testFormatContentType(t *testing.T) {

func TestFormat(t *testing.T) {
t.Run("String", testFormatString)
t.Run("Handle", testFormatHandle)
t.Run("ContentType", testFormatContentType)
}

// testTranscodeMessage expects a nonpointer reference to a WRP message struct as the original parameter
func testTranscodeMessage(t *testing.T, target, source Format, original interface{}) {
var (
assert = assert.New(t)
require = require.New(t)

originalValue = reflect.ValueOf(original)
encodeValue = reflect.New(originalValue.Type())
decodeValue = reflect.New(originalValue.Type())
)

// encodeValue is now a pointer to a copy of the original
encodeValue.Elem().Set(originalValue)
func testTranscodeMessage(t *testing.T, target, source Format, original Message) {
assert := assert.New(t)
require := require.New(t)

var (
sourceBuffer bytes.Buffer
Expand All @@ -258,15 +239,16 @@ func testTranscodeMessage(t *testing.T, target, source Format, original interfac
)

// create the input first
require.NoError(sourceEncoder.Encode(encodeValue.Interface()))
require.NoError(sourceEncoder.Encode(&original))

// now we can attempt the transcode
message, err := TranscodeMessage(targetEncoder, sourceDecoder)
assert.NotNil(message)
assert.NoError(err)

assert.NoError(targetDecoder.Decode(decodeValue.Interface()))
assert.Equal(encodeValue.Elem().Interface(), decodeValue.Elem().Interface())
var got Message
assert.NoError(targetDecoder.Decode(&got))
assert.Equal(original, got)
}

func TestTranscodeMessage(t *testing.T) {
Expand All @@ -275,13 +257,12 @@ func TestTranscodeMessage(t *testing.T) {
expectedRequestDeliveryResponse int64 = -1234

messages = []Message{
Message{},
Message{
{},
{
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
Payload: []byte("hi!"),
},
Message{
}, {
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
ContentType: MimeTypeWrp,
Expand All @@ -292,13 +273,13 @@ func TestTranscodeMessage(t *testing.T) {
Metadata: map[string]string{"hi": "there"},
Payload: []byte("hi!"),
},
Message{},
Message{
{},
{
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
Payload: []byte("hi!"),
},
Message{
{
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
ContentType: MimeTypeWrp,
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.9.0
github.com/ugorji/go/codec v1.2.12
github.com/tinylib/msgp v1.2.5
zappem.net/pub/debug/xxd v1.0.0
)

require (
github.com/k0kubun/pp v3.0.1+incompatible // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit e5b1e6a

Please sign in to comment.