Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switched to DataDog zstd wrapper, reusing the compression ctx #287

Merged
merged 1 commit into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/apache/pulsar-client-go
go 1.12

require (
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/golang/protobuf v1.3.1
Expand All @@ -15,6 +16,5 @@ require (
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.4.0
github.com/valyala/gozstd v1.7.0
github.com/yahoo/athenz v1.8.55
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
Expand Down Expand Up @@ -49,8 +51,6 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ=
github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
16 changes: 12 additions & 4 deletions pulsar/internal/compression/zstd_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,37 @@
package compression

import (
zstd "github.com/valyala/gozstd"
"github.com/DataDog/zstd"
log "github.com/sirupsen/logrus"
)

type zstdCGoProvider struct {
ctx zstd.Ctx
compressionLevel int
}

func newCGoZStdProvider(compressionLevel int) Provider {
return &zstdCGoProvider{
compressionLevel: compressionLevel,
ctx: zstd.NewCtx(),
}
}

func NewZStdProvider() Provider {
return newCGoZStdProvider(zstd.DefaultCompressionLevel)
return newCGoZStdProvider(zstd.DefaultCompression)
}

func (z *zstdCGoProvider) Compress(data []byte) []byte {
return zstd.CompressLevel(nil, data, z.compressionLevel)
out, err := z.ctx.CompressLevel(nil, data, z.compressionLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}

return out
}

func (z *zstdCGoProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
return zstd.Decompress(nil, compressedData)
return z.ctx.Decompress(nil, compressedData)
}

func (z *zstdCGoProvider) Close() error {
Expand Down