Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the code to use msgp. Add benchmarks. #223

Merged
merged 2 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .reuse/dep5
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Files: format_string.go
Copyright: SPDX-FileCopyrightText: 2022 Comcast Cable Communications Management, LLC
License: Apache-2.0

Files: messages_codec.go
Files: messages_gen.go
Copyright: SPDX-FileCopyrightText: 2022 Comcast Cable Communications Management, LLC
License: Apache-2.0

Expand Down
165 changes: 90 additions & 75 deletions format.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"strings"

"github.com/ugorji/go/codec"
)

//go:generate go install golang.org/x/tools/cmd/stringer@latest
Expand All @@ -25,7 +24,9 @@
Msgpack Format = iota
JSON
lastFormat
)

const (
MimeTypeMsgpack = "application/msgpack"
MimeTypeJson = "application/json"
MimeTypeOctetStream = "application/octet-stream"
Expand All @@ -40,29 +41,6 @@
return []Format{Msgpack, JSON}
}

var (
jsonHandle = codec.JsonHandle{
// TODO replace `codec.BasicHandle` since it's not meant to be used directly
// nolint:staticcheck
BasicHandle: codec.BasicHandle{
TypeInfos: codec.NewTypeInfos([]string{"json"}),
},
IntegerAsString: 'L',
}

// msgpackHandle uses the configuration required for the updated msgpack spec.
// this is what's required to ensure that the Payload field is encoded and decoded properly.
// See: http://ugorji.net/blog/go-codec-primer#format-specific-runtime-configuration
msgpackHandle = codec.MsgpackHandle{
WriteExt: true,
// TODO replace `codec.BasicHandle` since it's not meant to be used directly
// nolint:staticcheck
BasicHandle: codec.BasicHandle{
TypeInfos: codec.NewTypeInfos([]string{"json"}),
},
}
)

// ContentType returns the MIME type associated with this format
func (f Format) ContentType() string {
switch f {
Expand Down Expand Up @@ -102,84 +80,121 @@
return Format(-1), fmt.Errorf("invalid WRP content type: %s", contentType)
}

// handle looks up the appropriate codec.Handle for this format constant.
// This method panics if the format is not a valid value.
func (f Format) handle() codec.Handle {
// Encoder represents the underlying ugorji behavior that WRP supports
type Encoder interface {
Encode(*Message) error
}

// Decoder represents the underlying ugorji behavior that WRP supports
type Decoder interface {
Decode(*Message) error
}

// NewEncoder produces an Encoder using the appropriate WRP configuration for
// the given format.
func NewEncoder(output io.Writer, f Format) Encoder {
switch f {
case JSON:
return &jsonEncoder{enc: json.NewEncoder(output)}
case Msgpack:
return &msgpackHandle
return &msgpEncoder{stream: output}
}

return nil

Check warning on line 103 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L103

Added line #L103 was not covered by tests
}

// NewEncoderBytes produces an Encoder using the appropriate WRP configuration
// for the given format.
func NewEncoderBytes(output *[]byte, f Format) Encoder {
switch f {

Check warning on line 109 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L108-L109

Added lines #L108 - L109 were not covered by tests
case JSON:
return &jsonHandle
return &jsonEncoder{enc: json.NewEncoder(bytes.NewBuffer(*output))}
case Msgpack:
return &msgpEncoder{bits: output}

Check warning on line 113 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L111-L113

Added lines #L111 - L113 were not covered by tests
}

panic(fmt.Errorf("Invalid format constant: %d", f))
return nil

Check warning on line 116 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L116

Added line #L116 was not covered by tests
}

// EncodeListener can be implemented on any type passed to an Encoder in order
// to get notified when an encoding happens. This interface is useful to set
// mandatory fields, such as message type.
type EncodeListener interface {
BeforeEncode() error
type jsonEncoder struct {
enc *json.Encoder
}

// Encoder represents the underlying ugorji behavior that WRP supports
type Encoder interface {
Encode(interface{}) error
Reset(io.Writer)
ResetBytes(*[]byte)
func (e *jsonEncoder) Encode(msg *Message) error {
return e.enc.Encode(msg)
}

// encoderDecorator wraps a ugorji Encoder and implements the wrp.Encoder interface.
type encoderDecorator struct {
*codec.Encoder
type msgpEncoder struct {
bits *[]byte
stream io.Writer
}

// Encode checks to see if value implements EncoderTo and if it does, uses the
// value.EncodeTo() method. Otherwise, the value is passed as is to the decorated
// ugorji Encoder.
func (ed *encoderDecorator) Encode(value interface{}) error {
if listener, ok := value.(EncodeListener); ok {
if err := listener.BeforeEncode(); err != nil {
func (e *msgpEncoder) Encode(msg *Message) error {
if e.stream != nil {
got, err := msg.MarshalMsg(nil)
if err != nil {
return err
}
_, err = e.stream.Write(got)
return err
}

return ed.Encoder.Encode(value)
_, err := msg.MarshalMsg(*e.bits)
return err

Check warning on line 143 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L142-L143

Added lines #L142 - L143 were not covered by tests
}

// Decoder represents the underlying ugorji behavior that WRP supports
type Decoder interface {
Decode(interface{}) error
Reset(io.Reader)
ResetBytes([]byte)
}

// NewEncoder produces a ugorji Encoder using the appropriate WRP configuration
// NewDecoder produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewEncoder(output io.Writer, f Format) Encoder {
return &encoderDecorator{
codec.NewEncoder(output, f.handle()),
func NewDecoder(input io.Reader, f Format) Decoder {
switch f {
case JSON:
d := json.NewDecoder(input)
d.UseNumber()
return &jsonDecoder{dec: d}
case Msgpack:
return &msgpDecoder{stream: input}
}

return nil

Check warning on line 158 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L158

Added line #L158 was not covered by tests
}

// NewEncoderBytes produces a ugorji Encoder using the appropriate WRP configuration
// NewDecoderBytes produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewEncoderBytes(output *[]byte, f Format) Encoder {
return &encoderDecorator{
codec.NewEncoderBytes(output, f.handle()),
func NewDecoderBytes(input []byte, f Format) Decoder {
switch f {
case JSON:
return &jsonDecoder{dec: json.NewDecoder(bytes.NewReader(input))}

Check warning on line 166 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L165-L166

Added lines #L165 - L166 were not covered by tests
case Msgpack:
return &msgpDecoder{bits: input}
}

return nil

Check warning on line 171 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L171

Added line #L171 was not covered by tests
}

// NewDecoder produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewDecoder(input io.Reader, f Format) Decoder {
return codec.NewDecoder(input, f.handle())
type jsonDecoder struct {
dec *json.Decoder
}

// NewDecoderBytes produces a ugorji Decoder using the appropriate WRP configuration
// for the given format
func NewDecoderBytes(input []byte, f Format) Decoder {
return codec.NewDecoderBytes(input, f.handle())
func (d *jsonDecoder) Decode(msg *Message) error {
return d.dec.Decode(msg)
}

type msgpDecoder struct {
bits []byte
stream io.Reader
}

func (d *msgpDecoder) Decode(msg *Message) error {
var err error
if d.stream != nil {
d.bits, err = io.ReadAll(d.stream)
if err != nil {
return err
}

Check warning on line 193 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L192-L193

Added lines #L192 - L193 were not covered by tests
}
_, err = msg.UnmarshalMsg(d.bits)

return err
}

// TranscodeMessage converts a WRP message of any type from one format into another,
Expand All @@ -197,7 +212,7 @@

// MustEncode is a convenience function that attempts to encode a given message. A panic
// is raised on any error. This function is handy for package initialization.
func MustEncode(message interface{}, f Format) []byte {
func MustEncode(message *Message, f Format) []byte {

Check warning on line 215 in format.go

View check run for this annotation

Codecov / codecov/patch

format.go#L215

Added line #L215 was not covered by tests
var (
output bytes.Buffer
encoder = NewEncoder(&output, f)
Expand Down
55 changes: 18 additions & 37 deletions format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"crypto/rand"
"encoding/hex"
"fmt"
"reflect"
"testing"

// "github.com/k0kubun/pp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
// "zappem.net/pub/debug/xxd"
)

func testPayload(t *testing.T, payload []byte) {
Expand All @@ -25,15 +26,13 @@ func testPayload(t *testing.T, payload []byte) {

decoded Message

output bytes.Buffer
encoder = NewEncoder(nil, Msgpack)
decoder = NewDecoder(nil, Msgpack)
output bytes.Buffer
)

encoder.Reset(&output)
encoder := NewEncoder(&output, Msgpack)
require.NoError(encoder.Encode(&original))

decoder.Reset(&output)
decoder := NewDecoder(&output, Msgpack)
require.NoError(decoder.Decode(&decoded))

// don't output the payload if it's a ridiculous size
Expand Down Expand Up @@ -210,14 +209,6 @@ func testFormatString(t *testing.T) {
assert.NotEqual(JSON.String(), Msgpack.String())
}

func testFormatHandle(t *testing.T) {
assert := assert.New(t)

assert.NotNil(JSON.handle())
assert.NotNil(Msgpack.handle())
assert.Panics(func() { Format(999).handle() })
}

func testFormatContentType(t *testing.T) {
assert := assert.New(t)

Expand All @@ -229,23 +220,13 @@ func testFormatContentType(t *testing.T) {

func TestFormat(t *testing.T) {
t.Run("String", testFormatString)
t.Run("Handle", testFormatHandle)
t.Run("ContentType", testFormatContentType)
}

// testTranscodeMessage expects a nonpointer reference to a WRP message struct as the original parameter
func testTranscodeMessage(t *testing.T, target, source Format, original interface{}) {
var (
assert = assert.New(t)
require = require.New(t)

originalValue = reflect.ValueOf(original)
encodeValue = reflect.New(originalValue.Type())
decodeValue = reflect.New(originalValue.Type())
)

// encodeValue is now a pointer to a copy of the original
encodeValue.Elem().Set(originalValue)
func testTranscodeMessage(t *testing.T, target, source Format, original Message) {
assert := assert.New(t)
require := require.New(t)

var (
sourceBuffer bytes.Buffer
Expand All @@ -258,15 +239,16 @@ func testTranscodeMessage(t *testing.T, target, source Format, original interfac
)

// create the input first
require.NoError(sourceEncoder.Encode(encodeValue.Interface()))
require.NoError(sourceEncoder.Encode(&original))

// now we can attempt the transcode
message, err := TranscodeMessage(targetEncoder, sourceDecoder)
assert.NotNil(message)
assert.NoError(err)

assert.NoError(targetDecoder.Decode(decodeValue.Interface()))
assert.Equal(encodeValue.Elem().Interface(), decodeValue.Elem().Interface())
var got Message
assert.NoError(targetDecoder.Decode(&got))
assert.Equal(original, got)
}

func TestTranscodeMessage(t *testing.T) {
Expand All @@ -275,13 +257,12 @@ func TestTranscodeMessage(t *testing.T) {
expectedRequestDeliveryResponse int64 = -1234

messages = []Message{
Message{},
Message{
{},
{
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
Payload: []byte("hi!"),
},
Message{
}, {
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
ContentType: MimeTypeWrp,
Expand All @@ -292,13 +273,13 @@ func TestTranscodeMessage(t *testing.T) {
Metadata: map[string]string{"hi": "there"},
Payload: []byte("hi!"),
},
Message{},
Message{
{},
{
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
Payload: []byte("hi!"),
},
Message{
{
Source: "foobar.com",
Destination: "mac:FFEEDDCCBBAA",
ContentType: MimeTypeWrp,
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.9.0
github.com/ugorji/go/codec v1.2.12
github.com/tinylib/msgp v1.2.5
zappem.net/pub/debug/xxd v1.0.0
)

require (
github.com/k0kubun/pp v3.0.1+incompatible // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
golang.org/x/sys v0.29.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading