Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure time spent on encoding and the compaction ratio #871

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 64 additions & 11 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/klauspost/compress/zstd"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// maxDecompressedSize is the default maximum amount of memory allocated by the
Expand Down Expand Up @@ -37,15 +42,29 @@
return &CBOR[T]{}
}

func (c *CBOR[T]) Encode(m T) ([]byte, error) {
func (c *CBOR[T]) Encode(m T) (_ []byte, _err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetCborEncode))
}

Check warning on line 51 in internal/encoding/encoding.go

View check run for this annotation

Codecov / codecov/patch

internal/encoding/encoding.go#L48-L51

Added lines #L48 - L51 were not covered by tests
}(time.Now())
var out bytes.Buffer
if err := m.MarshalCBOR(&out); err != nil {
return nil, err
}
return out.Bytes(), nil
}

func (c *CBOR[T]) Decode(v []byte, t T) error {
func (c *CBOR[T]) Decode(v []byte, t T) (_err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetCborDecode))
}

Check warning on line 66 in internal/encoding/encoding.go

View check run for this annotation

Codecov / codecov/patch

internal/encoding/encoding.go#L63-L66

Added lines #L63 - L66 were not covered by tests
}(time.Now())
r := bytes.NewReader(v)
return t.UnmarshalCBOR(r)
}
Expand All @@ -54,6 +73,9 @@
cborEncoding *CBOR[T]
compressor *zstd.Encoder
decompressor *zstd.Decoder

metricAttr attribute.KeyValue
metricAttrLoader sync.Once
}

func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) {
Expand All @@ -74,26 +96,57 @@
}, nil
}

func (c *ZSTD[T]) Encode(m T) ([]byte, error) {
cborEncoded, err := c.cborEncoding.Encode(m)
if len(cborEncoded) > maxDecompressedSize {
func (c *ZSTD[T]) Encode(t T) (_ []byte, _err error) {
defer func(start time.Time) {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetZstdEncode))
}(time.Now())
decompressed, err := c.cborEncoding.Encode(t)
if len(decompressed) > maxDecompressedSize {
// Error out early if the encoded value is too large to be decompressed.
return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(cborEncoded), maxDecompressedSize)
return nil, fmt.Errorf("encoded value cannot exceed maximum size: %d > %d", len(decompressed), maxDecompressedSize)

Check warning on line 108 in internal/encoding/encoding.go

View check run for this annotation

Codecov / codecov/patch

internal/encoding/encoding.go#L108

Added line #L108 was not covered by tests
}
if err != nil {
return nil, err
}
compressed := c.compressor.EncodeAll(cborEncoded, make([]byte, 0, len(cborEncoded)))
compressed := c.compressor.EncodeAll(decompressed, make([]byte, 0, len(decompressed)))
c.meterCompressionRatio(len(decompressed), len(compressed))
return compressed, nil
}

func (c *ZSTD[T]) Decode(v []byte, t T) error {
func (c *ZSTD[T]) Decode(compressed []byte, t T) (_err error) {
defer func(start time.Time) {
if _err != nil {
metrics.encodingTime.Record(context.Background(),
time.Since(start).Seconds(),
metric.WithAttributeSet(attrSetZstdDecode))
}
}(time.Now())
buf := bufferPool.Get().(*[]byte)
defer bufferPool.Put(buf)

cborEncoded, err := c.decompressor.DecodeAll(v, (*buf)[:0])
decompressed, err := c.decompressor.DecodeAll(compressed, (*buf)[:0])
if err != nil {
return err
}
return c.cborEncoding.Decode(cborEncoded, t)
c.meterCompressionRatio(len(decompressed), len(compressed))
return c.cborEncoding.Decode(decompressed, t)
}

func (c *ZSTD[T]) meterCompressionRatio(decompressedSize, compressedSize int) {
compressionRatio := float64(decompressedSize) / float64(compressedSize)
metrics.zstdCompressionRatio.Record(context.Background(), compressionRatio, metric.WithAttributes(c.getMetricAttribute()))
}

func (c *ZSTD[T]) getMetricAttribute() attribute.KeyValue {
c.metricAttrLoader.Do(func() {
const key = "type"
switch target := reflect.TypeFor[T](); {
case target.Kind() == reflect.Ptr:
c.metricAttr = attribute.String(key, target.Elem().Name())
default:
c.metricAttr = attribute.String(key, target.Name())
}
})
return c.metricAttr
}
7 changes: 7 additions & 0 deletions internal/encoding/encoding_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package encoding

import "go.opentelemetry.io/otel/attribute"

// GetMetricAttribute returns the attribute for metric collection, exported for
// testing purposes.
func (c *ZSTD[T]) GetMetricAttribute() attribute.KeyValue { return c.getMetricAttribute() }
18 changes: 18 additions & 0 deletions internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/require"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel/attribute"
)

var (
Expand Down Expand Up @@ -77,3 +78,20 @@ func TestZSTDLimits(t *testing.T) {
var dest testValue
require.ErrorContains(t, subject.Decode(tooLargeACompression, &dest), "decompressed size exceeds configured limit")
}

func TestZSTD_GetMetricAttribute(t *testing.T) {
t.Run("By Pointer", func(t *testing.T) {
subject, err := encoding.NewZSTD[*testValue]()
require.NoError(t, err)
require.Equal(t, attribute.String("type", "testValue"), subject.GetMetricAttribute())
})
t.Run("By Value", func(t *testing.T) {
type anotherTestValue struct {
cbg.CBORUnmarshaler
cbg.CBORMarshaler
}
subject, err := encoding.NewZSTD[anotherTestValue]()
require.NoError(t, err)
require.Equal(t, attribute.String("type", "anotherTestValue"), subject.GetMetricAttribute())
})
}
38 changes: 38 additions & 0 deletions internal/encoding/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package encoding

import (
"github.com/filecoin-project/go-f3/internal/measurements"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
attrCodecCbor = attribute.String("codec", "cbor")
attrCodecZstd = attribute.String("codec", "zstd")
attrActionEncode = attribute.String("action", "encode")
attrActionDecode = attribute.String("action", "decode")
attrSetCborEncode = attribute.NewSet(attrCodecCbor, attrActionEncode)
attrSetCborDecode = attribute.NewSet(attrCodecCbor, attrActionDecode)
attrSetZstdEncode = attribute.NewSet(attrCodecZstd, attrActionEncode)
attrSetZstdDecode = attribute.NewSet(attrCodecZstd, attrActionDecode)

meter = otel.Meter("f3/internal/encoding")

metrics = struct {
encodingTime metric.Float64Histogram
zstdCompressionRatio metric.Float64Histogram
}{
encodingTime: measurements.Must(meter.Float64Histogram(
"f3_internal_encoding_time",
metric.WithDescription("The time spent on encoding/decoding in seconds."),
metric.WithUnit("s"),
metric.WithExplicitBucketBoundaries(0.001, 0.003, 0.005, 0.01, 0.03, 0.05, 0.1, 0.3, 0.5, 1.0, 2.0, 5.0, 10.0),
)),
zstdCompressionRatio: measurements.Must(meter.Float64Histogram(
"f3_internal_encoding_zstd_compression_ratio",
metric.WithDescription("The ratio of compressed to uncompressed data size for zstd encoding."),
metric.WithExplicitBucketBoundaries(0.0, 0.1, 0.2, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0),
)),
}
)
Loading