From bd0d1a8e6424e8b38d9a19d23777cc412773a43d Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 11 Jun 2020 17:16:08 -0700 Subject: [PATCH 1/2] Introduced lifecycle for compression providers --- pulsar/consumer_partition.go | 80 ++++++++++++------- pulsar/consumer_partition_test.go | 13 ++- pulsar/internal/batch_builder.go | 17 ++-- pulsar/internal/compression/compression.go | 18 ++--- .../compression/compression_bench_test.go | 40 ++++++++-- .../internal/compression/compression_test.go | 11 +-- pulsar/internal/compression/lz4.go | 27 ++++--- pulsar/internal/compression/noop.go | 15 ++-- pulsar/internal/compression/zlib.go | 12 ++- pulsar/internal/compression/zstd.go | 3 +- pulsar/internal/compression/zstd_cgo.go | 12 ++- pulsar/internal/compression/zstd_go.go | 22 +++-- pulsar/producer_partition.go | 4 + 13 files changed, 174 insertions(+), 100 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 4c40ae9147..ff4cb649a3 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -32,15 +32,6 @@ import ( "github.com/apache/pulsar-client-go/pulsar/internal/pb" ) -var ( - compressionProviders = map[pb.CompressionType]compression.Provider{ - pb.CompressionType_NONE: compression.NoopProvider, - pb.CompressionType_LZ4: compression.Lz4Provider, - pb.CompressionType_ZLIB: compression.ZLibProvider, - pb.CompressionType_ZSTD: compression.ZStdProvider, - } -) - type consumerState int const ( @@ -115,29 +106,32 @@ type partitionConsumer struct { dlq *dlqRouter log *log.Entry + + compressionProviders map[pb.CompressionType]compression.Provider } func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts, messageCh chan ConsumerMessage, dlq *dlqRouter) (*partitionConsumer, error) { pc := &partitionConsumer{ - state: consumerInit, - parentConsumer: parent, - client: client, - options: options, - topic: options.topic, - name: options.consumerName, - consumerID: client.rpcClient.NewConsumerID(), - partitionIdx: options.partitionIdx, - eventsCh: make(chan interface{}, 3), - queueSize: int32(options.receiverQueueSize), - queueCh: make(chan []*message, options.receiverQueueSize), - startMessageID: options.startMessageID, - connectedCh: make(chan struct{}), - messageCh: messageCh, - closeCh: make(chan struct{}), - clearQueueCh: make(chan func(id *messageID)), - dlq: dlq, - log: log.WithField("topic", options.topic), + state: consumerInit, + parentConsumer: parent, + client: client, + options: options, + topic: options.topic, + name: options.consumerName, + consumerID: client.rpcClient.NewConsumerID(), + partitionIdx: options.partitionIdx, + eventsCh: make(chan interface{}, 3), + queueSize: int32(options.receiverQueueSize), + queueCh: make(chan []*message, options.receiverQueueSize), + startMessageID: options.startMessageID, + connectedCh: make(chan struct{}), + messageCh: messageCh, + closeCh: make(chan struct{}), + clearQueueCh: make(chan func(id *messageID)), + compressionProviders: make(map[pb.CompressionType]compression.Provider), + dlq: dlq, + log: log.WithField("topic", options.topic), } pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription) pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay) @@ -685,6 +679,10 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { pc.log.Info("Closed consumer") } + for _, provider := range pc.compressionProviders { + provider.Close() + } + pc.state = consumerClosed pc.conn.DeleteConsumeHandler(pc.consumerID) if pc.nackTracker != nil { @@ -846,11 +844,15 @@ func getPreviousMessage(mid *messageID) *messageID { } func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) { - provider, ok := compressionProviders[msgMeta.GetCompression()] + provider, ok := pc.compressionProviders[msgMeta.GetCompression()] if !ok { - err := fmt.Errorf("unsupported compression type: %v", msgMeta.GetCompression()) - pc.log.WithError(err).Error("Failed to decompress message.") - return nil, err + var err error + if provider, err = pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil { + pc.log.WithError(err).Error("Failed to decompress message.") + return nil, err + } + + pc.compressionProviders[msgMeta.GetCompression()] = provider } uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize())) @@ -861,6 +863,22 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int return internal.NewBufferWrapper(uncompressed), nil } +func (pc *partitionConsumer) initializeCompressionProvider( + compressionType pb.CompressionType) (compression.Provider, error) { + switch compressionType { + case pb.CompressionType_NONE: + return compression.NewNoopProvider(), nil + case pb.CompressionType_ZLIB: + return compression.NewZLibProvider(), nil + case pb.CompressionType_LZ4: + return compression.NewLz4Provider(), nil + case pb.CompressionType_ZSTD: + return compression.NewZStdProvider(), nil + } + + return nil, fmt.Errorf("unsupported compression type: %v", compressionType) +} + func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, validationError pb.CommandAck_ValidationError) { pc.log.WithFields(log.Fields{ diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 71273c5059..37f6b7efc9 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -20,6 +20,9 @@ package pulsar import ( "testing" + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + "github.com/apache/pulsar-client-go/pulsar/internal/pb" + "github.com/stretchr/testify/assert" "github.com/apache/pulsar-client-go/pulsar/internal" @@ -28,8 +31,9 @@ import ( func TestSingleMessageIDNoAckTracker(t *testing.T) { eventsCh := make(chan interface{}, 1) pc := partitionConsumer{ - queueCh: make(chan []*message, 1), - eventsCh: eventsCh, + queueCh: make(chan []*message, 1), + eventsCh: eventsCh, + compressionProviders: make(map[pb.CompressionType]compression.Provider), } headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) @@ -56,8 +60,9 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { func TestBatchMessageIDNoAckTracker(t *testing.T) { eventsCh := make(chan interface{}, 1) pc := partitionConsumer{ - queueCh: make(chan []*message, 1), - eventsCh: eventsCh, + queueCh: make(chan []*message, 1), + eventsCh: eventsCh, + compressionProviders: make(map[pb.CompressionType]compression.Provider), } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 35cdf71f81..78be00ef57 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -18,7 +18,6 @@ package internal import ( - "fmt" "time" "github.com/apache/pulsar-client-go/pulsar/internal/compression" @@ -92,10 +91,6 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p bb.msgMetadata.Compression = &compressionType } - if !bb.compressionProvider.CanCompress() { - return nil, fmt.Errorf("compression provider %d can only decompress data", compressionType) - } - return bb, nil } @@ -177,16 +172,20 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks return buffer.ReadableSlice(), sequenceID, callbacks } +func (bb *BatchBuilder) Close() error { + return bb.compressionProvider.Close() +} + func getCompressionProvider(compressionType pb.CompressionType) compression.Provider { switch compressionType { case pb.CompressionType_NONE: - return compression.NoopProvider + return compression.NewNoopProvider() case pb.CompressionType_LZ4: - return compression.Lz4Provider + return compression.NewLz4Provider() case pb.CompressionType_ZLIB: - return compression.ZLibProvider + return compression.NewZLibProvider() case pb.CompressionType_ZSTD: - return compression.ZStdProvider + return compression.NewZStdProvider() default: log.Panic("unsupported compression type") return nil diff --git a/pulsar/internal/compression/compression.go b/pulsar/internal/compression/compression.go index ac52092a0b..e45d18ff5a 100644 --- a/pulsar/internal/compression/compression.go +++ b/pulsar/internal/compression/compression.go @@ -17,11 +17,10 @@ package compression +import "io" + // Provider is a interface of compression providers type Provider interface { - // CanCompress checks if the compression method is available under the current version. - CanCompress() bool - // Compress a []byte, the param is a []byte with the uncompressed content. // The reader/writer indexes will not be modified. The return is a []byte // with the compressed content. @@ -31,11 +30,10 @@ type Provider interface { // The compressedData is compressed content, originalSize is the size of the original content. // The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise. Decompress(compressedData []byte, originalSize int) ([]byte, error) -} -var ( - NoopProvider = NewNoopProvider() - ZLibProvider = NewZLibProvider() - Lz4Provider = NewLz4Provider() - ZStdProvider = NewZStdProvider() -) + // Returns a new instance of the same provider, with the same exact configuration + Clone() Provider + + // Close the compressor + io.Closer +} diff --git a/pulsar/internal/compression/compression_bench_test.go b/pulsar/internal/compression/compression_bench_test.go index 8b4b1a73fa..fea2eb81d5 100644 --- a/pulsar/internal/compression/compression_bench_test.go +++ b/pulsar/internal/compression/compression_bench_test.go @@ -22,10 +22,12 @@ import ( "testing" ) -var compressed int +const ( + dataSampleFile = "test_data_sample.txt" +) func testCompression(b *testing.B, provider Provider) { - data, err := ioutil.ReadFile("test_data_sample.txt") + data, err := ioutil.ReadFile(dataSampleFile) if err != nil { b.Error(err) } @@ -35,15 +37,14 @@ func testCompression(b *testing.B, provider Provider) { b.ResetTimer() for i := 0; i < b.N; i++ { - // Use len() to avoid the compiler optimizing the call away - compressed = len(provider.Compress(data)) + provider.Compress(data) b.SetBytes(dataLen) } } func testDecompression(b *testing.B, provider Provider) { // Read data sample file - data, err := ioutil.ReadFile("test_data_sample.txt") + data, err := ioutil.ReadFile(dataSampleFile) if err != nil { b.Error(err) } @@ -61,8 +62,8 @@ func testDecompression(b *testing.B, provider Provider) { } var benchmarkProviders = []testProvider{ - {"zlib", ZLibProvider, nil}, - {"lz4", Lz4Provider, nil}, + {"zlib", NewZLibProvider(), nil}, + {"lz4", NewLz4Provider(), nil}, {"zstd-pure-go-fastest", newPureGoZStdProvider(1), nil}, {"zstd-pure-go-default", newPureGoZStdProvider(2), nil}, {"zstd-pure-go-best", newPureGoZStdProvider(3), nil}, @@ -90,3 +91,28 @@ func BenchmarkDecompression(b *testing.B) { }) } } + +func BenchmarkCompressionParallel(b *testing.B) { + b.ReportAllocs() + + data, err := ioutil.ReadFile(dataSampleFile) + if err != nil { + b.Error(err) + } + + dataLen := int64(len(data)) + b.ResetTimer() + + for _, provider := range benchmarkProviders { + p := provider + b.Run(p.name, func(b *testing.B) { + b.RunParallel(func(pb *testing.PB) { + localProvider := p.provider.Clone() + for pb.Next() { + localProvider.Compress(data) + b.SetBytes(dataLen) + } + }) + }) + } +} diff --git a/pulsar/internal/compression/compression_test.go b/pulsar/internal/compression/compression_test.go index 09fe35925e..7df821ffaa 100644 --- a/pulsar/internal/compression/compression_test.go +++ b/pulsar/internal/compression/compression_test.go @@ -32,19 +32,16 @@ type testProvider struct { } var providers = []testProvider{ - {"zlib", ZLibProvider, []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}}, - {"lz4", Lz4Provider, []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}}, - {"zstd", ZStdProvider, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}}, + {"zlib", NewZLibProvider(), []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}}, + {"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}}, + {"zstd", NewZStdProvider(), + []byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}}, } func TestCompression(t *testing.T) { for _, provider := range providers { p := provider t.Run(p.name, func(t *testing.T) { - if !p.provider.CanCompress() { - return - } - hello := []byte("test compression data") compressed := p.provider.Compress(hello) uncompressed, err := p.provider.Decompress(compressed, len(hello)) diff --git a/pulsar/internal/compression/lz4.go b/pulsar/internal/compression/lz4.go index d2520573f6..5a1f0f4bfd 100644 --- a/pulsar/internal/compression/lz4.go +++ b/pulsar/internal/compression/lz4.go @@ -21,24 +21,23 @@ import ( "github.com/pierrec/lz4" ) -type lz4Provider struct{} +type lz4Provider struct { + hashTable []int +} // NewLz4Provider return a interface of Provider. func NewLz4Provider() Provider { - return &lz4Provider{} -} + const tableSize = 1 << 16 -func (lz4Provider) CanCompress() bool { - return true + return &lz4Provider{ + hashTable: make([]int, tableSize), + } } -func (lz4Provider) Compress(data []byte) []byte { - const tableSize = 1 << 16 - hashTable := make([]int, tableSize) - +func (l *lz4Provider) Compress(data []byte) []byte { maxSize := lz4.CompressBlockBound(len(data)) compressed := make([]byte, maxSize) - size, err := lz4.CompressBlock(data, compressed, hashTable) + size, err := lz4.CompressBlock(data, compressed, l.hashTable) if err != nil { panic("Failed to compress") } @@ -75,3 +74,11 @@ func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, _, err := lz4.UncompressBlock(compressedData, uncompressed) return uncompressed, err } + +func (lz4Provider) Close() error { + return nil +} + +func (lz4Provider) Clone() Provider { + return NewLz4Provider() +} diff --git a/pulsar/internal/compression/noop.go b/pulsar/internal/compression/noop.go index 0d6412328b..48318c5b7f 100644 --- a/pulsar/internal/compression/noop.go +++ b/pulsar/internal/compression/noop.go @@ -19,16 +19,11 @@ package compression type noopProvider struct{} -// NewNoopProvider returns a Provider interface +// NewNoopProvider returns a Provider interface that does not compress the data func NewNoopProvider() Provider { return &noopProvider{} } -// CanCompress always returns true, in the case of noopProvider, noopProvider means no compression. -func (noopProvider) CanCompress() bool { - return true -} - func (noopProvider) Compress(data []byte) []byte { return data } @@ -36,3 +31,11 @@ func (noopProvider) Compress(data []byte) []byte { func (noopProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) { return compressedData, nil } + +func (noopProvider) Close() error { + return nil +} + +func (noopProvider) Clone() Provider { + return NewNoopProvider() +} diff --git a/pulsar/internal/compression/zlib.go b/pulsar/internal/compression/zlib.go index d5c575f0de..fe252061e5 100644 --- a/pulsar/internal/compression/zlib.go +++ b/pulsar/internal/compression/zlib.go @@ -30,10 +30,6 @@ func NewZLibProvider() Provider { return &zlibProvider{} } -func (zlibProvider) CanCompress() bool { - return true -} - func (zlibProvider) Compress(data []byte) []byte { var b bytes.Buffer w := zlib.NewWriter(&b) @@ -65,3 +61,11 @@ func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, return uncompressed, nil } + +func (zlibProvider) Clone() Provider { + return NewZLibProvider() +} + +func (zlibProvider) Close() error { + return nil +} diff --git a/pulsar/internal/compression/zstd.go b/pulsar/internal/compression/zstd.go index 1dff293a00..a9124034ed 100644 --- a/pulsar/internal/compression/zstd.go +++ b/pulsar/internal/compression/zstd.go @@ -21,6 +21,7 @@ package compression import ( "fmt" + "github.com/klauspost/compress/zstd" ) @@ -33,4 +34,4 @@ func newCGoZStdProvider(compressionLevel int) Provider { // The warning is only shown when running the benchmark with CGO disabled. fmt.Println("WARNING: CGO is disabled, using pure Go implementation of ZStd. Use CGO_ENABLED=1 when running benchmark.") return newPureGoZStdProvider(zstd.SpeedDefault) -} \ No newline at end of file +} diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index 2eba0a3c2b..bf396a5ac9 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -41,10 +41,6 @@ func NewZStdProvider() Provider { return newCGoZStdProvider(zstd.DefaultCompressionLevel) } -func (*zstdCGoProvider) CanCompress() bool { - return true -} - func (z *zstdCGoProvider) Compress(data []byte) []byte { return zstd.CompressLevel(nil, data, z.compressionLevel) } @@ -52,3 +48,11 @@ func (z *zstdCGoProvider) Compress(data []byte) []byte { func (z *zstdCGoProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) { return zstd.Decompress(nil, compressedData) } + +func (z *zstdCGoProvider) Close() error { + return nil +} + +func (z *zstdCGoProvider) Clone() Provider { + return newCGoZStdProvider(z.compressionLevel) +} diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go index da3004a943..f9839c57bc 100644 --- a/pulsar/internal/compression/zstd_go.go +++ b/pulsar/internal/compression/zstd_go.go @@ -23,21 +23,20 @@ import ( ) type zstdProvider struct { - encoder *zstd.Encoder - decoder *zstd.Decoder + compressionLevel zstd.EncoderLevel + encoder *zstd.Encoder + decoder *zstd.Decoder } func newPureGoZStdProvider(compressionLevel zstd.EncoderLevel) Provider { - p := &zstdProvider{} + p := &zstdProvider{ + compressionLevel: compressionLevel, + } p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(compressionLevel)) p.decoder, _ = zstd.NewReader(nil) return p } -func (p *zstdProvider) CanCompress() bool { - return true -} - func (p *zstdProvider) Compress(data []byte) []byte { return p.encoder.EncodeAll(data, []byte{}) } @@ -49,3 +48,12 @@ func (p *zstdProvider) Decompress(compressedData []byte, originalSize int) (dst } return } + +func (p *zstdProvider) Close() error { + p.decoder.Close() + return p.encoder.Close() +} + +func (p *zstdProvider) Clone() Provider { + return newPureGoZStdProvider(p.compressionLevel) +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 0f3d1983bc..02c4b9943f 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -463,6 +463,10 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.log.Info("Closed producer") } + if err = p.batchBuilder.Close(); err != nil { + p.log.WithError(err).Warn("Failed to close batch builder") + } + atomic.StoreInt32(&p.state, producerClosed) p.cnx.UnregisterListener(p.producerID) p.batchFlushTicker.Stop() From 4f93fb80322c92338bcd4c40ea987380d51d8b0e Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 17 Jun 2020 16:48:48 -0700 Subject: [PATCH 2/2] Fixed mocked test --- pulsar/consumer_partition_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 37f6b7efc9..5ed1e061e6 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -89,8 +89,9 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { func TestBatchMessageIDWithAckTracker(t *testing.T) { eventsCh := make(chan interface{}, 1) pc := partitionConsumer{ - queueCh: make(chan []*message, 1), - eventsCh: eventsCh, + queueCh: make(chan []*message, 1), + eventsCh: eventsCh, + compressionProviders: make(map[pb.CompressionType]compression.Provider), } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)