@@ -2,11 +2,16 @@ package encoding
2
2
3
3
import (
4
4
"bytes"
5
+ "context"
5
6
"fmt"
7
+ "reflect"
6
8
"sync"
9
+ "time"
7
10
8
11
"github.com/klauspost/compress/zstd"
9
12
cbg "github.com/whyrusleeping/cbor-gen"
13
+ "go.opentelemetry.io/otel/attribute"
14
+ "go.opentelemetry.io/otel/metric"
10
15
)
11
16
12
17
// maxDecompressedSize is the default maximum amount of memory allocated by the
@@ -37,15 +42,29 @@ func NewCBOR[T CBORMarshalUnmarshaler]() *CBOR[T] {
37
42
return & CBOR [T ]{}
38
43
}
39
44
40
- func (c * CBOR [T ]) Encode (m T ) ([]byte , error ) {
45
+ func (c * CBOR [T ]) Encode (m T ) (_ []byte , _err error ) {
46
+ defer func (start time.Time ) {
47
+ if _err != nil {
48
+ metrics .encodingTime .Record (context .Background (),
49
+ time .Since (start ).Seconds (),
50
+ metric .WithAttributeSet (attrSetCborEncode ))
51
+ }
52
+ }(time .Now ())
41
53
var out bytes.Buffer
42
54
if err := m .MarshalCBOR (& out ); err != nil {
43
55
return nil , err
44
56
}
45
57
return out .Bytes (), nil
46
58
}
47
59
48
- func (c * CBOR [T ]) Decode (v []byte , t T ) error {
60
+ func (c * CBOR [T ]) Decode (v []byte , t T ) (_err error ) {
61
+ defer func (start time.Time ) {
62
+ if _err != nil {
63
+ metrics .encodingTime .Record (context .Background (),
64
+ time .Since (start ).Seconds (),
65
+ metric .WithAttributeSet (attrSetCborDecode ))
66
+ }
67
+ }(time .Now ())
49
68
r := bytes .NewReader (v )
50
69
return t .UnmarshalCBOR (r )
51
70
}
@@ -54,6 +73,9 @@ type ZSTD[T CBORMarshalUnmarshaler] struct {
54
73
cborEncoding * CBOR [T ]
55
74
compressor * zstd.Encoder
56
75
decompressor * zstd.Decoder
76
+
77
+ metricAttr attribute.KeyValue
78
+ metricAttrLoader sync.Once
57
79
}
58
80
59
81
func NewZSTD [T CBORMarshalUnmarshaler ]() (* ZSTD [T ], error ) {
@@ -74,26 +96,67 @@ func NewZSTD[T CBORMarshalUnmarshaler]() (*ZSTD[T], error) {
74
96
}, nil
75
97
}
76
98
77
- func (c * ZSTD [T ]) Encode (m T ) ([]byte , error ) {
78
- cborEncoded , err := c .cborEncoding .Encode (m )
79
- if len (cborEncoded ) > maxDecompressedSize {
99
+ func (c * ZSTD [T ]) Encode (t T ) (_ []byte , _err error ) {
100
+ decompressed , err := c .cborEncoding .Encode (t )
101
+ if len (decompressed ) > maxDecompressedSize {
80
102
// Error out early if the encoded value is too large to be decompressed.
81
- return nil , fmt .Errorf ("encoded value cannot exceed maximum size: %d > %d" , len (cborEncoded ), maxDecompressedSize )
103
+ return nil , fmt .Errorf ("encoded value cannot exceed maximum size: %d > %d" , len (decompressed ), maxDecompressedSize )
82
104
}
83
105
if err != nil {
84
106
return nil , err
85
107
}
86
- compressed := c .compressor .EncodeAll (cborEncoded , make ([]byte , 0 , len (cborEncoded )))
108
+
109
+ compressed := c .compress (decompressed )
110
+ c .meterCompactionRate (t , len (decompressed ), len (compressed ))
87
111
return compressed , nil
88
112
}
89
113
90
- func (c * ZSTD [T ]) Decode (v []byte , t T ) error {
114
+ func (c * ZSTD [T ]) Decode (compressed []byte , t T ) error {
91
115
buf := bufferPool .Get ().(* []byte )
92
116
defer bufferPool .Put (buf )
93
117
94
- cborEncoded , err := c .decompressor . DecodeAll ( v , (* buf )[:0 ])
118
+ decompressed , err := c .decompressInto ( compressed , (* buf )[:0 ])
95
119
if err != nil {
96
120
return err
97
121
}
98
- return c .cborEncoding .Decode (cborEncoded , t )
122
+ c .meterCompactionRate (t , len (decompressed ), len (compressed ))
123
+ return c .cborEncoding .Decode (decompressed , t )
124
+ }
125
+
126
+ func (c * ZSTD [T ]) compress (decompressed []byte ) []byte {
127
+ defer func (start time.Time ) {
128
+ metrics .encodingTime .Record (context .Background (),
129
+ time .Since (start ).Seconds (),
130
+ metric .WithAttributeSet (attrSetZstdEncode ))
131
+ }(time .Now ())
132
+ return c .compressor .EncodeAll (decompressed , make ([]byte , 0 , len (decompressed )))
133
+ }
134
+
135
+ func (c * ZSTD [T ]) decompressInto (compressed []byte , buf []byte ) (_ []byte , _err error ) {
136
+ defer func (start time.Time ) {
137
+ if _err != nil {
138
+ metrics .encodingTime .Record (context .Background (),
139
+ time .Since (start ).Seconds (),
140
+ metric .WithAttributeSet (attrSetZstdDecode ))
141
+ }
142
+ }(time .Now ())
143
+ return c .decompressor .DecodeAll (compressed , buf )
144
+ }
145
+
146
+ func (c * ZSTD [T ]) meterCompactionRate (target T , decompressedSize , compressedSize int ) {
147
+ compactionRatio := float64 (decompressedSize ) / float64 (compressedSize )
148
+ metrics .zstdCompactionRatio .Record (context .Background (), compactionRatio , metric .WithAttributes (c .getMetricAttribute (target )))
149
+ }
150
+
151
+ func (c * ZSTD [T ]) getMetricAttribute (t T ) attribute.KeyValue {
152
+ c .metricAttrLoader .Do (func () {
153
+ const key = "type"
154
+ switch target := reflect .TypeOf (t ); {
155
+ case target .Kind () == reflect .Ptr :
156
+ c .metricAttr = attribute .String (key , target .Elem ().Name ())
157
+ default :
158
+ c .metricAttr = attribute .String (key , target .Name ())
159
+ }
160
+ })
161
+ return c .metricAttr
99
162
}
0 commit comments