From f5f1866cc91dcb7ae47337d869fc354338c17be3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Wed, 5 Jun 2024 15:40:14 +0200 Subject: [PATCH] [configgrpc] Use own compressors for zstd (#10323) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Uses our own version of the zstd compressor for gRPC servers. The code for it is based on the gzip compressor that comes built-in with gRPC. Benchmarks before this PR: ``` Running tool: /usr/bin/go test -benchmem -run=^$ -bench ^BenchmarkCompressors$ go.opentelemetry.io/collector/config/configgrpc sm_log_requestgoos: linux goarch: amd64 pkg: go.opentelemetry.io/collector/config/configgrpc cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz BenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_162/compressor_gzip-16 71594 19066 ns/op 615 B/op 4 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_159/compressor_zstd-16 151503 8544 ns/op 640 B/op 6 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_178/compressor_snappy-16 3632570 303.8 ns/op 304 B/op 3 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_219/compressor_gzip-16 68114 16938 ns/op 748 B/op 4 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_209/compressor_zstd-16 138091 8047 ns/op 896 B/op 6 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_260/compressor_snappy-16 3081198 402.5 ns/op 400 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_253/compressor_gzip-16 43414 27174 ns/op 386 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_216/compressor_zstd-16 117534 9903 ns/op 10112 B/op 6 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_454/compressor_snappy-16 1000000 1190 ns/op 528 B/op 2 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_203/compressor_gzip-16 67275 17508 ns/op 700 B/op 4 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_201/compressor_zstd-16 196862 6137 ns/op 848 B/op 6 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_220/compressor_snappy-16 3595815 331.7 ns/op 272 B/op 2 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_249/compressor_gzip-16 64105 19104 ns/op 844 B/op 4 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_256/compressor_zstd-16 169221 6929 ns/op 1120 B/op 6 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_279/compressor_snappy-16 2602239 473.0 ns/op 336 B/op 2 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_303/compressor_gzip-16 33861 36473 ns/op 904 B/op 4 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_258/compressor_zstd-16 107828 10596 ns/op 16832 B/op 6 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_591/compressor_snappy-16 725080 1540 ns/op 689 B/op 2 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_140/compressor_gzip-16 76315 16394 ns/op 496 B/op 4 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_137/compressor_zstd-16 193314 5957 ns/op 688 B/op 6 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_152/compressor_snappy-16 3558649 345.2 ns/op 208 B/op 2 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_194/compressor_gzip-16 68497 18413 ns/op 699 B/op 4 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_198/compressor_zstd-16 177841 6520 ns/op 1136 B/op 6 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_222/compressor_snappy-16 2354102 497.4 ns/op 272 B/op 2 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_601/compressor_gzip-16 21943 54603 ns/op 1941 B/op 5 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_559/compressor_zstd-16 71260 16077 ns/op 25312 B/op 6 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_1055/compressor_snappy-16 335415 3026 ns/op 1200 B/op 2 allocs/op PASS ok go.opentelemetry.io/collector/config/configgrpc 37.766s ``` After this version: ``` Running tool: /usr/bin/go test -benchmem -run=^$ -bench ^BenchmarkCompressors$ go.opentelemetry.io/collector/config/configgrpc sm_log_requestgoos: linux goarch: amd64 pkg: go.opentelemetry.io/collector/config/configgrpc cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz BenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_162/compressor_gzip-16 74952 15710 ns/op 603 B/op 4 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_159/compressor_zstd-16 156784 6966 ns/op 208 B/op 2 allocs/op sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_178/compressor_snappy-16 2216174 510.4 ns/op 308 B/op 3 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_219/compressor_gzip-16 68095 18569 ns/op 736 B/op 4 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_209/compressor_zstd-16 150705 8849 ns/op 294 B/op 2 allocs/op md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_260/compressor_snappy-16 2149710 556.8 ns/op 406 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_253/compressor_gzip-16 40040 26159 ns/op 368 B/op 3 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_216/compressor_zstd-16 123043 10254 ns/op 299 B/op 2 allocs/op lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_454/compressor_snappy-16 726780 1457 ns/op 533 B/op 2 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_203/compressor_gzip-16 64660 18186 ns/op 701 B/op 4 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_201/compressor_zstd-16 193225 6267 ns/op 273 B/op 2 allocs/op sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_220/compressor_snappy-16 2925073 418.2 ns/op 276 B/op 2 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_249/compressor_gzip-16 61320 20641 ns/op 846 B/op 4 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_256/compressor_zstd-16 190965 6440 ns/op 321 B/op 2 allocs/op md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_279/compressor_snappy-16 2051575 656.8 ns/op 341 B/op 2 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_303/compressor_gzip-16 30097 40680 ns/op 907 B/op 4 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_258/compressor_zstd-16 127027 8437 ns/op 363 B/op 2 allocs/op lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_591/compressor_snappy-16 716541 1803 ns/op 694 B/op 2 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_140/compressor_gzip-16 82287 15054 ns/op 496 B/op 4 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_137/compressor_zstd-16 230558 5470 ns/op 221 B/op 2 allocs/op sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_152/compressor_snappy-16 2759403 417.1 ns/op 211 B/op 2 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_194/compressor_gzip-16 58208 18925 ns/op 702 B/op 4 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_198/compressor_zstd-16 199226 6247 ns/op 256 B/op 2 allocs/op md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_222/compressor_snappy-16 2065202 609.8 ns/op 276 B/op 2 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_601/compressor_gzip-16 20583 59762 ns/op 1945 B/op 5 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_559/compressor_zstd-16 98254 13152 ns/op 728 B/op 2 allocs/op lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_1055/compressor_snappy-16 389401 3976 ns/op 1209 B/op 2 allocs/op PASS ok go.opentelemetry.io/collector/config/configgrpc 40.394s ``` Signed-off-by: Juraci Paixão Kröhling --------- Signed-off-by: Juraci Paixão Kröhling --- ...nfiggrpc-use-own-compressors-for-zstd.yaml | 13 +++ config/configgrpc/configgrpc.go | 4 +- .../configgrpc/configgrpc_benchmark_test.go | 4 +- config/configgrpc/go.mod | 2 +- config/configgrpc/internal/zstd.go | 83 +++++++++++++++++++ config/configgrpc/internal/zstd_test.go | 41 +++++++++ receiver/otlpreceiver/otlp_test.go | 3 +- 7 files changed, 144 insertions(+), 6 deletions(-) create mode 100644 .chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml create mode 100644 config/configgrpc/internal/zstd.go create mode 100644 config/configgrpc/internal/zstd_test.go diff --git a/.chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml b/.chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml new file mode 100644 index 00000000000..a04c4f89012 --- /dev/null +++ b/.chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'bug_fix' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: Use own compressors for zstd + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Before this change, the zstd compressor we used didn't respect the max message size. + +# One or more tracking issues or pull requests related to the change +issues: [10323] diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 98d428857ce..b57a199461c 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -12,7 +12,6 @@ import ( "time" "github.com/mostynb/go-grpc-compression/nonclobbering/snappy" - "github.com/mostynb/go-grpc-compression/nonclobbering/zstd" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" "google.golang.org/grpc" @@ -28,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configcompression" + grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtelemetry" @@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err case configcompression.TypeSnappy: return snappy.Name, nil case configcompression.TypeZstd: - return zstd.Name, nil + return grpcInternal.ZstdName, nil default: return "", fmt.Errorf("unsupported compression type %q", compressionType) } diff --git a/config/configgrpc/configgrpc_benchmark_test.go b/config/configgrpc/configgrpc_benchmark_test.go index 1ad755f2b4f..3254655e9ec 100644 --- a/config/configgrpc/configgrpc_benchmark_test.go +++ b/config/configgrpc/configgrpc_benchmark_test.go @@ -10,12 +10,12 @@ import ( "testing" "github.com/mostynb/go-grpc-compression/nonclobbering/snappy" - "github.com/mostynb/go-grpc-compression/nonclobbering/zstd" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/config/configgrpc/internal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -27,7 +27,7 @@ func BenchmarkCompressors(b *testing.B) { compressors := make([]encoding.Compressor, 0) compressors = append(compressors, encoding.GetCompressor(gzip.Name)) - compressors = append(compressors, encoding.GetCompressor(zstd.Name)) + compressors = append(compressors, encoding.GetCompressor(internal.ZstdName)) compressors = append(compressors, encoding.GetCompressor(snappy.Name)) for _, payload := range payloads { diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index cca50c4ffc5..960a0d96376 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configgrpc go 1.21.0 require ( + github.com/klauspost/compress v1.17.2 github.com/mostynb/go-grpc-compression v1.2.2 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.102.0 @@ -36,7 +37,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.2 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect diff --git a/config/configgrpc/internal/zstd.go b/config/configgrpc/internal/zstd.go new file mode 100644 index 00000000000..0718b73535f --- /dev/null +++ b/config/configgrpc/internal/zstd.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// Copyright 2017 gRPC authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal" + +import ( + "errors" + "io" + "sync" + + "github.com/klauspost/compress/zstd" + "google.golang.org/grpc/encoding" +) + +const ZstdName = "zstd" + +func init() { + encoding.RegisterCompressor(NewZstdCodec()) +} + +type writer struct { + *zstd.Encoder + pool *sync.Pool +} + +func NewZstdCodec() encoding.Compressor { + c := &compressor{} + c.poolCompressor.New = func() any { + zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024)) + return &writer{Encoder: zw, pool: &c.poolCompressor} + } + return c +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + z := c.poolCompressor.Get().(*writer) + z.Encoder.Reset(w) + return z, nil +} + +func (z *writer) Close() error { + defer z.pool.Put(z) + return z.Encoder.Close() +} + +type reader struct { + *zstd.Decoder + pool *sync.Pool +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + z, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newZ, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil + } + if err := z.Reset(r); err != nil { + c.poolDecompressor.Put(z) + return nil, err + } + return z, nil +} + +func (z *reader) Read(p []byte) (n int, err error) { + n, err = z.Decoder.Read(p) + if errors.Is(err, io.EOF) { + z.pool.Put(z) + } + return n, err +} + +func (c *compressor) Name() string { + return ZstdName +} + +type compressor struct { + poolCompressor sync.Pool + poolDecompressor sync.Pool +} diff --git a/config/configgrpc/internal/zstd_test.go b/config/configgrpc/internal/zstd_test.go new file mode 100644 index 00000000000..e16336c8ccb --- /dev/null +++ b/config/configgrpc/internal/zstd_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_zstdCodec_CompressDecompress(t *testing.T) { + // prepare + msg := []byte("Hello world.") + compressed := &bytes.Buffer{} + + // zstd header, for sanity checking + header := []byte{40, 181, 47, 253} + + c := NewZstdCodec() + cWriter, err := c.Compress(compressed) + require.NoError(t, err) + require.NotNil(t, cWriter) + + _, err = cWriter.Write(msg) + require.NoError(t, err) + cWriter.Close() + + cReader, err := c.Decompress(compressed) + require.NoError(t, err) + require.NotNil(t, cReader) + + uncompressed, err := io.ReadAll(cReader) + require.NoError(t, err) + require.Equal(t, msg, uncompressed) + + // test header + require.Equal(t, header, compressed.Bytes()[:4]) +} diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 51f99bb4cb8..14eecbcdac1 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -726,7 +726,8 @@ func TestGRPCMaxRecvSize(t *testing.T) { require.NoError(t, err) td := testdata.GenerateTraces(50000) - require.Error(t, exportTraces(cc, td)) + err = exportTraces(cc, td) + require.Error(t, err) assert.NoError(t, cc.Close()) require.NoError(t, recv.Shutdown(context.Background()))