From 5c34f4d7afddaa4ce2f4881762bef7691f7bc16b Mon Sep 17 00:00:00 2001 From: axfor Date: Fri, 23 Jun 2023 17:43:50 +0800 Subject: [PATCH] Squashed commit of the following: commit cfef09cd471b7a8f749300748c4da1ec277b46c7 Merge: a5cbf94f f8d219a1 Author: axfor Date: Sun Mar 26 14:24:42 2023 +0800 Merge branch 'upgrade-sarama-v1.38.1' into upgrade-sarama-v1.38.1 commit a5cbf94f2711738d7c0bf01756368928f174d5b1 Merge: c90f0706 13fa044f Author: axfor Date: Sun Mar 26 14:23:35 2023 +0800 Merge branch 'main' into upgrade-sarama-v1.38.1 commit f8d219a14d0c26fcf609c84ee6e800d84e67e725 Merge: 46f63683 13fa044f Author: axx Date: Sun Mar 26 14:23:11 2023 +0800 Merge branch 'main' into upgrade-sarama-v1.38.1 commit 46f63683837b468af4bd7d79fbcdc7ff34f6d13e Merge: 6d75d036 4f231b6d Author: axx Date: Sat Mar 25 15:29:51 2023 +0800 Merge branch 'main' into upgrade-sarama-v1.38.1 commit 6d75d036776cb66c791ac771621cfe8b7c14027b Merge: c90f0706 45d2de23 Author: axx Date: Thu Mar 23 18:39:50 2023 +0800 Merge branch 'main' into upgrade-sarama-v1.38.1 commit c90f0706312036b5cbffe89f02e09595cd508ac2 Author: axfor Date: Thu Mar 23 01:10:54 2023 +0800 fix unnecessary assignment to the blank identifier (gosimple) Signed-off-by: axfor commit 90d74dd03d9d5192a22b6378ff93bed2853a1333 Author: axfor Date: Thu Mar 23 00:16:07 2023 +0800 fix some tips Signed-off-by: axfor commit f0a2d53fbade1b40a60f6d76dbb25d3f0e19146c Author: axfor Date: Wed Mar 22 23:55:29 2023 +0800 add some comments Signed-off-by: axfor commit 170f77da1b1109ffdcbdc189d0fe11eb5da8fb1c Author: axfor Date: Wed Mar 22 23:27:55 2023 +0800 Fixed the issue of repeatedly closing ready Signed-off-by: axfor commit e74b66b49c4368f43ca5de5c6bc9c40562f062fe Author: axfor Date: Wed Mar 22 22:07:34 2023 +0800 The consumer interface is changed to a minimum subset Signed-off-by: axfor commit f8863b5e500f6480438eca843032a7701436c155 Merge: 2703cb92 6ab3f018 Author: axfor Date: Wed Mar 22 21:38:09 2023 +0800 Merge branch 'main' into upgrade-sarama-v1.38.1 Signed-off-by: axfor commit 2703cb929188b9c52d1b4cade6320817e2e63942 Merge: 810629b1 66e66106 Author: axfor Date: Wed Mar 22 21:30:14 2023 +0800 Merge branch 'upgrade-sarama-v1.38.1' into upgrade-sarama-v1.38.1 commit 810629b18a4de021fd0d68e6d12fac772872efed Author: axfor Date: Wed Mar 22 21:29:40 2023 +0800 add func ready of consumer and remove commitInterval config Signed-off-by: axfor commit 66e66106f4edfc09f1a62903fe80fd24cbc37b89 Merge: ec4e958c 06c59e78 Author: axx Date: Sun Mar 19 16:29:14 2023 +0800 Merge branch 'main' into upgrade-sarama-v1.38.1 commit ec4e958c613badbbecc825eba30fca306ac39396 Author: axfor Date: Sun Mar 19 04:16:35 2023 +0800 Remove the Consumer's partitionMapLock field and change the partitionsHeld type from int64 to atomic.int64 Signed-off-by: axfor commit 15568792f71aba04b81d2621f67c5a40e699c441 Author: axfor Date: Sun Mar 19 02:07:00 2023 +0800 fixed some typoe Signed-off-by: axfor commit 28dc4966ce39e703802a6056aa95534cc30a7a4a Merge: 7294b58d c46d6ca7 Author: axfor Date: Sun Mar 19 01:58:04 2023 +0800 Merge branch 'upgrade-sarama-v1.38.1' into upgrade-sarama-v1.38.1 commit 7294b58d182458b69cf1e5d9d9a3d7fa18503111 Author: axfor Date: Sun Mar 19 01:56:53 2023 +0800 fixed some err of integration test Signed-off-by: axfor commit c46d6ca74f54e62c52224a46751bcc9141dfc926 Merge: 90ddffc6 43c29a0e Author: axx Date: Sat Mar 18 22:15:21 2023 +0800 Merge branch 'main' into upgrade-sarama-v1.38.1 commit 90ddffc6a80a5171f0e5b3a6be519749f9f81491 Author: axfor Date: Sat Mar 18 22:11:12 2023 +0800 Improved log printing when a new ConsumerGroup object fails Signed-off-by: axfor commit 655a6ddb205dce6019070332ce52ba57d9e1f1b6 Merge: 3a82b7cf 18843def Author: axfor Date: Sun Mar 12 17:13:54 2023 +0800 Merge branch 'main' code Signed-off-by: axfor commit 3a82b7cf29af9459558a5e362a96ccae4e4d0bf4 Author: axfor Date: Sun Mar 12 17:09:42 2023 +0800 update some comment Signed-off-by: axfor commit e077b0915d6ec61f5a45ee54072e69dfaefef569 Author: axfor Date: Sun Mar 12 16:43:57 2023 +0800 improve some of the tests Signed-off-by: axfor commit 574810a0fda9c1d99a3606a55b6ecda42588996d Author: Clement Date: Sun Mar 12 03:02:00 2023 +0800 [chore]: Replace pkg/multierror with standard errors.Join (#4293) Resolves #4292 Signed-off-by: Clement Signed-off-by: axfor commit b7ab7fd31a69af287f29df06d7d51cf94cb39914 Author: axfor Date: Sun Mar 12 00:06:22 2023 +0800 Bump github.com/Shopify/sarama from v1.33.0 to v1.38.1 Signed-off-by: axfor --- cmd/ingester/app/consumer/consumer.go | 146 ++++++++---- cmd/ingester/app/consumer/consumer_test.go | 219 +----------------- cmd/ingester/app/consumer/mocks/Consumer.go | 17 +- .../app/consumer/processor_factory.go | 15 +- .../app/consumer/processor_factory_test.go | 39 +--- go.mod | 9 +- go.sum | 62 +---- pkg/kafka/consumer/config.go | 29 ++- plugin/storage/integration/kafka_test.go | 3 + 9 files changed, 147 insertions(+), 392 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 329c21141931..acdc4d406c23 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -15,11 +15,13 @@ package consumer import ( + "context" + "strings" "sync" + "sync/atomic" "time" "github.com/Shopify/sarama" - sc "github.com/bsm/sarama-cluster" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" @@ -46,16 +48,15 @@ type Consumer struct { deadlockDetector deadlockDetector - partitionIDToState map[int32]*consumerState - partitionMapLock sync.Mutex - partitionsHeld int64 + partitionsHeld atomic.Int64 partitionsHeldGauge metrics.Gauge - doneWg sync.WaitGroup -} + doneWg sync.WaitGroup + ready chan struct{} + readyCloseLock sync.Mutex -type consumerState struct { - partitionConsumer sc.PartitionConsumer + topic string + cancel context.CancelFunc } // New is a constructor for a Consumer @@ -67,25 +68,37 @@ func New(params Params) (*Consumer, error) { internalConsumer: params.InternalConsumer, processorFactory: params.ProcessorFactory, deadlockDetector: deadlockDetector, - partitionIDToState: make(map[int32]*consumerState), partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory), + ready: make(chan struct{}), + topic: params.ProcessorFactory.topic, }, nil } // Start begins consuming messages in a go routine func (c *Consumer) Start() { c.deadlockDetector.start() + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + c.doneWg.Add(1) + go func() { - c.logger.Info("Starting main loop") - for pc := range c.internalConsumer.Partitions() { - c.partitionMapLock.Lock() - c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc} - c.partitionMapLock.Unlock() - c.partitionMetrics(pc.Partition()).startCounter.Inc(1) - - c.doneWg.Add(2) - go c.handleMessages(pc) - go c.handleErrors(pc.Partition(), pc.Errors()) + defer c.doneWg.Done() + for { + select { + case <-ctx.Done(): + c.logger.Error("ctx canceld") + return + default: + c.logger.Info("Topic", zap.Strings("topic", strings.Split(c.topic, ","))) + if err := c.internalConsumer.Consume(ctx, strings.Split(c.topic, ","), c); err != nil { + c.logger.Error("Error from consumer", zap.Error(err)) + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + return + } + c.ready = make(chan struct{}) + } } }() } @@ -98,6 +111,9 @@ func (c *Consumer) Close() error { c.logger.Debug("Closing deadlock detector") c.deadlockDetector.close() + if c.cancel != nil { + c.cancel() + } c.logger.Debug("Waiting for messages and errors to be handled") c.doneWg.Wait() @@ -105,44 +121,75 @@ func (c *Consumer) Close() error { return err } -// handleMessages handles incoming Kafka messages on a channel -func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { - c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) - c.partitionMapLock.Lock() - c.partitionsHeld++ - c.partitionsHeldGauge.Update(c.partitionsHeld) - c.partitionMapLock.Unlock() +// Ready is consumer running +func (c *Consumer) Ready() { + <-c.ready +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { + // Mark the consumer as ready + + c.readyCloseLock.Lock() + defer c.readyCloseLock.Unlock() + + select { + case <-c.ready: + default: + close(c.ready) + } + + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + c.partitionMetrics(claim.Partition()).startCounter.Inc(1) + + c.doneWg.Add(2) + go c.handleErrors(claim.Partition(), c.internalConsumer.Errors()) + c.handleMessages(session, claim) + return nil +} + +// handleMessages starting message handler +func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) { + c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition())) + + c.partitionsHeld.Add(1) + c.partitionsHeldGauge.Update(c.partitionsHeld.Load()) + defer func() { - c.closePartition(pc) - c.partitionMapLock.Lock() - c.partitionsHeld-- - c.partitionsHeldGauge.Update(c.partitionsHeld) - c.partitionMapLock.Unlock() + c.closePartition(claim) + c.partitionsHeld.Add(-1) + c.partitionsHeldGauge.Update(c.partitionsHeld.Load()) c.doneWg.Done() }() - msgMetrics := c.newMsgMetrics(pc.Partition()) + msgMetrics := c.newMsgMetrics(claim.Partition()) var msgProcessor processor.SpanProcessor - deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition()) + deadlockDetector := c.deadlockDetector.startMonitoringForPartition(claim.Partition()) defer deadlockDetector.close() for { select { - case msg, ok := <-pc.Messages(): - if !ok { - c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition())) - return - } + case msg := <-claim.Messages(): + c.logger.Debug("Got msg", zap.Any("msg", msg)) msgMetrics.counter.Inc(1) msgMetrics.offsetGauge.Update(msg.Offset) - msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1) + msgMetrics.lagGauge.Update(claim.HighWaterMarkOffset() - msg.Offset - 1) deadlockDetector.incrementMsgCount() if msgProcessor == nil { - msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1) + msgProcessor = c.processorFactory.new(session, claim, msg.Offset-1) defer msgProcessor.Close() } @@ -151,22 +198,27 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { c.logger.Error("Failed to process a Kafka message", zap.Error(err), zap.Int32("partition", msg.Partition), zap.Int64("offset", msg.Offset)) } + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/Shopify/sarama/issues/1192 + case <-session.Context().Done(): + c.logger.Info("Session done", zap.Int32("partition", claim.Partition())) + return case <-deadlockDetector.closePartitionChannel(): - c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition())) + c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", claim.Partition())) return } } } -func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { - c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition())) - partitionConsumer.Close() // blocks until messages channel is drained - c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1) - c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition())) +// closePartition close partition of consumer +func (c *Consumer) closePartition(claim sarama.ConsumerGroupClaim) { + c.logger.Info("Closing partition consumer", zap.Int32("partition", claim.Partition())) + c.partitionMetrics(claim.Partition()).closeCounter.Inc(1) } // handleErrors handles incoming Kafka consumer errors on a channel -func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { +func (c *Consumer) handleErrors(partition int32, errChan <-chan error) { c.logger.Info("Starting error handler", zap.Int32("partition", partition)) defer c.doneWg.Done() diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 731218b4b196..c32f5ee4747b 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -15,30 +15,15 @@ package consumer import ( - "errors" - "fmt" - "sync" "testing" - "time" - "github.com/Shopify/sarama" - smocks "github.com/Shopify/sarama/mocks" - cluster "github.com/bsm/sarama-cluster" + "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.uber.org/zap" kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" - "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" - pmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" - "github.com/jaegertracing/jaeger/internal/metricstest" - "github.com/jaegertracing/jaeger/pkg/kafka/consumer" - "github.com/jaegertracing/jaeger/pkg/metrics" ) //go:generate mockery -dir ../../../../pkg/kafka/config/ -name Consumer -//go:generate mockery -dir ../../../../../vendor/github.com/bsm/sarama-cluster/ -name PartitionConsumer const ( topic = "morekuzambu" @@ -52,65 +37,6 @@ func TestConstructor(t *testing.T) { assert.NotNil(t, newConsumer) } -// partitionConsumerWrapper wraps a Sarama partition consumer into a Sarama cluster partition consumer -type partitionConsumerWrapper struct { - topic string - partition int32 - - sarama.PartitionConsumer -} - -func (s partitionConsumerWrapper) Partition() int32 { - return s.partition -} - -func (s partitionConsumerWrapper) Topic() string { - return s.topic -} - -func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer, mc *smocks.PartitionConsumer) *kmocks.Consumer { - pcha := make(chan cluster.PartitionConsumer, 1) - pcha <- &partitionConsumerWrapper{ - topic: topic, - partition: partition, - PartitionConsumer: saramaPartitionConsumer, - } - saramaClusterConsumer := &kmocks.Consumer{} - saramaClusterConsumer.On("Partitions").Return((<-chan cluster.PartitionConsumer)(pcha)) - saramaClusterConsumer.On("Close").Return(nil).Run(func(args mock.Arguments) { - mc.Close() - }) - saramaClusterConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - return saramaClusterConsumer -} - -func newConsumer( - t *testing.T, - metricsFactory metrics.Factory, - topic string, - processor processor.SpanProcessor, - consumer consumer.Consumer, -) *Consumer { - logger, _ := zap.NewDevelopment() - consumerParams := Params{ - MetricsFactory: metricsFactory, - Logger: logger, - InternalConsumer: consumer, - ProcessorFactory: ProcessorFactory{ - topic: topic, - consumer: consumer, - metricsFactory: metricsFactory, - logger: logger, - baseProcessor: processor, - parallelism: 1, - }, - } - - c, err := New(consumerParams) - require.NoError(t, err) - return c -} - func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) { sc := &kmocks.Consumer{} metadata := "meatbag" @@ -118,146 +44,3 @@ func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) { sc.MarkPartitionOffset(topic, partition, msgOffset, metadata) sc.AssertCalled(t, "MarkPartitionOffset", topic, partition, msgOffset, metadata) } - -func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { - localFactory := metricstest.NewFactory(0) - - msg := &sarama.ConsumerMessage{} - - isProcessed := sync.WaitGroup{} - isProcessed.Add(1) - mp := &pmocks.SpanProcessor{} - mp.On("Process", saramaMessageWrapper{msg}).Return(func(msg processor.Message) error { - isProcessed.Done() - return nil - }) - - saramaConsumer := smocks.NewConsumer(t, &sarama.Config{}) - mc := saramaConsumer.ExpectConsumePartition(topic, partition, msgOffset) - mc.ExpectMessagesDrainedOnClose() - - saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset) - require.NoError(t, e) - - undertest := newConsumer(t, localFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer, mc)) - - undertest.partitionIDToState = map[int32]*consumerState{ - partition: { - partitionConsumer: &partitionConsumerWrapper{ - topic: topic, - partition: partition, - PartitionConsumer: &kmocks.PartitionConsumer{}, - }, - }, - } - - undertest.Start() - - mc.YieldMessage(msg) - isProcessed.Wait() - - localFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{ - Name: "sarama-consumer.partitions-held", - Value: 1, - }) - - mp.AssertExpectations(t) - // Ensure that the partition consumer was updated in the map - assert.Equal(t, saramaPartitionConsumer.HighWaterMarkOffset(), - undertest.partitionIDToState[partition].partitionConsumer.HighWaterMarkOffset()) - undertest.Close() - - localFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{ - Name: "sarama-consumer.partitions-held", - Value: 0, - }) - - partitionTag := map[string]string{"partition": fmt.Sprint(partition)} - localFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{ - Name: "sarama-consumer.messages", - Tags: partitionTag, - Value: 1, - }) - localFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{ - Name: "sarama-consumer.current-offset", - Tags: partitionTag, - Value: int(msgOffset), - }) - localFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{ - Name: "sarama-consumer.offset-lag", - Tags: partitionTag, - // Prior to sarama v1.31.0 this would be 0, it's unclear why this changed. - // v=1 seems to be correct because high watermark in mock is incremented upon - // consuming the message, and func HighWaterMarkOffset() returns internal value - // (already incremented) + 1, so the difference is always 2, and we then - // subtract 1 from it. - Value: 1, - }) - localFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{ - Name: "sarama-consumer.partition-start", - Tags: partitionTag, - Value: 1, - }) -} - -func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { - localFactory := metricstest.NewFactory(0) - - saramaConsumer := smocks.NewConsumer(t, &sarama.Config{}) - mc := saramaConsumer.ExpectConsumePartition(topic, partition, msgOffset) - mc.ExpectErrorsDrainedOnClose() - - saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset) - require.NoError(t, e) - - undertest := newConsumer(t, localFactory, topic, &pmocks.SpanProcessor{}, newSaramaClusterConsumer(saramaPartitionConsumer, mc)) - - undertest.Start() - mc.YieldError(errors.New("Daisy, Daisy")) - - for i := 0; i < 1000; i++ { - time.Sleep(time.Millisecond) - - c, _ := localFactory.Snapshot() - if len(c) == 0 { - continue - } - - partitionTag := map[string]string{"partition": fmt.Sprint(partition)} - localFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{ - Name: "sarama-consumer.errors", - Tags: partitionTag, - Value: 1, - }) - undertest.Close() - return - } - - t.Fail() -} - -func TestHandleClosePartition(t *testing.T) { - metricsFactory := metricstest.NewFactory(0) - - mp := &pmocks.SpanProcessor{} - saramaConsumer := smocks.NewConsumer(t, &sarama.Config{}) - mc := saramaConsumer.ExpectConsumePartition(topic, partition, msgOffset) - mc.ExpectErrorsDrainedOnClose() - saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset) - require.NoError(t, e) - - undertest := newConsumer(t, metricsFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer, mc)) - undertest.deadlockDetector = newDeadlockDetector(metricsFactory, undertest.logger, 200*time.Millisecond) - undertest.Start() - defer undertest.Close() - - for i := 0; i < 10; i++ { - undertest.deadlockDetector.allPartitionsDeadlockDetector.incrementMsgCount() // Don't trigger panic on all partitions detector - time.Sleep(100 * time.Millisecond) - c, _ := metricsFactory.Snapshot() - if c["sarama-consumer.partition-close|partition=316"] == 1 { - return - } - } - assert.Fail(t, "Did not close partition") -} diff --git a/cmd/ingester/app/consumer/mocks/Consumer.go b/cmd/ingester/app/consumer/mocks/Consumer.go index a40cf181b81e..826ff134aecc 100644 --- a/cmd/ingester/app/consumer/mocks/Consumer.go +++ b/cmd/ingester/app/consumer/mocks/Consumer.go @@ -15,10 +15,11 @@ // limitations under the License. package mocks - -import cluster "github.com/bsm/sarama-cluster" - -import mock "github.com/stretchr/testify/mock" + +import ( + "github.com/Shopify/sarama" + mock "github.com/stretchr/testify/mock" +) // Consumer is an autogenerated mock type for the Consumer type type Consumer struct { @@ -45,15 +46,15 @@ func (_m *Consumer) MarkPartitionOffset(topic string, partition int32, offset in } // Partitions provides a mock function with given fields: -func (_m *Consumer) Partitions() <-chan cluster.PartitionConsumer { +func (_m *Consumer) Partitions() <-chan sarama.ConsumerGroup { ret := _m.Called() - var r0 <-chan cluster.PartitionConsumer - if rf, ok := ret.Get(0).(func() <-chan cluster.PartitionConsumer); ok { + var r0 <-chan sarama.ConsumerGroup + if rf, ok := ret.Get(0).(func() <-chan sarama.ConsumerGroup); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan cluster.PartitionConsumer) + r0 = ret.Get(0).(<-chan sarama.ConsumerGroup) } } diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index 3bef57b8f6ce..2e243941ace4 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" + "github.com/Shopify/sarama" "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator" @@ -61,14 +62,18 @@ func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, erro }, nil } -func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { - c.logger.Info("Creating new processors", zap.Int32("partition", partition)) +func (c *ProcessorFactory) new( + session sarama.ConsumerGroupSession, + claim sarama.ConsumerGroupClaim, + minOffset int64) processor.SpanProcessor { - markOffset := func(offset int64) { - c.consumer.MarkPartitionOffset(c.topic, partition, offset, "") + c.logger.Info("Creating new processors", zap.Int32("partition", claim.Partition())) + + markOffsetFunc := func(offset int64) { + session.MarkOffset(claim.Topic(), claim.Partition(), offset, "") } - om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory) + om := offset.NewManager(minOffset, markOffsetFunc, claim.Partition(), c.metricsFactory) retryProcessor := decorator.NewRetryingProcessor(c.metricsFactory, c.baseProcessor, c.retryOptions...) cp := NewCommittingProcessor(retryProcessor, om) diff --git a/cmd/ingester/app/consumer/processor_factory_test.go b/cmd/ingester/app/consumer/processor_factory_test.go index c2b47f9e0d38..a407a33d3c55 100644 --- a/cmd/ingester/app/consumer/processor_factory_test.go +++ b/cmd/ingester/app/consumer/processor_factory_test.go @@ -16,15 +16,9 @@ package consumer import ( "testing" - "time" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "go.uber.org/zap" - - kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/mocks" - "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/stretchr/testify/assert" ) func Test_NewFactory(t *testing.T) { @@ -34,37 +28,6 @@ func Test_NewFactory(t *testing.T) { assert.NotNil(t, newFactory) } -func Test_new(t *testing.T) { - mockConsumer := &kmocks.Consumer{} - mockConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - - topic := "coelacanth" - partition := int32(21) - offset := int64(555) - - sp := &mocks.SpanProcessor{} - sp.On("Process", mock.Anything).Return(nil) - - pf := ProcessorFactory{ - topic: topic, - consumer: mockConsumer, - metricsFactory: metrics.NullFactory, - logger: zap.NewNop(), - baseProcessor: sp, - parallelism: 1, - } - - processor := pf.new(partition, offset) - msg := &kmocks.Message{} - msg.On("Offset").Return(offset + 1) - processor.Process(msg) - - // This sleep is greater than offset manager's resetInterval to allow it a chance to - // call MarkPartitionOffset. - time.Sleep(150 * time.Millisecond) - mockConsumer.AssertCalled(t, "MarkPartitionOffset", topic, partition, offset+1, "") -} - type fakeService struct { startCalled bool closeCalled bool diff --git a/go.mod b/go.mod index ad32b2de9ac9..0aad821f2693 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,8 @@ go 1.20 require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 - github.com/Shopify/sarama v1.33.0 + github.com/Shopify/sarama v1.38.1 github.com/apache/thrift v0.18.1 - github.com/bsm/sarama-cluster v2.1.13+incompatible github.com/crossdock/crossdock-go v0.0.0-20160816171116-049aabb0122b github.com/dgraph-io/badger/v3 v3.2103.5 github.com/fsnotify/fsnotify v1.6.0 @@ -87,7 +86,7 @@ require ( github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/eapache/go-resiliency v1.3.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect @@ -132,11 +131,9 @@ require ( github.com/mostynb/go-grpc-compression v1.1.18 // indirect github.com/oklog/run v1.1.0 // indirect github.com/oklog/ulid v1.3.1 // indirect - github.com/onsi/ginkgo v1.16.4 // indirect - github.com/onsi/gomega v1.13.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect - github.com/pierrec/lz4 v2.6.1+incompatible // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect diff --git a/go.sum b/go.sum index e8dd8b242344..fd3b92b0a172 100644 --- a/go.sum +++ b/go.sum @@ -48,10 +48,9 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= -github.com/Shopify/sarama v1.33.0 h1:2K4mB9M4fo46sAM7t6QTsmSO8dLX1OqznLM7vn3OjZ8= -github.com/Shopify/sarama v1.33.0/go.mod h1:lYO7LwEBkE0iAeTl94UfPSrDaavFzSFlmn+5isARATQ= -github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= -github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= +github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= +github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= +github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= @@ -93,8 +92,6 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= -github.com/bsm/sarama-cluster v2.1.13+incompatible h1:bqU3gMJbWZVxLZ9PGWVKP05yOmFXUlfw61RBwuE3PYU= -github.com/bsm/sarama-cluster v2.1.13+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -122,7 +119,6 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -139,11 +135,10 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= -github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -166,8 +161,6 @@ github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBd github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= -github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -229,7 +222,6 @@ github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+ github.com/go-openapi/validate v0.22.1 h1:G+c2ub6q47kfX1sOBLwIQwzBVt8qmOAARyo/9Fqs9NU= github.com/go-openapi/validate v0.22.1/go.mod h1:rjnrwK57VJ7A8xqfpAOEKRH8yQSGUriMu5/zuPSQ1hg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= @@ -407,7 +399,6 @@ github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d h1:W+SIwDdl3+jXWei github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hjson/hjson-go/v4 v4.0.0 h1:wlm6IYYqHjOdXH1gHev4VoXCaW20HdQAGCxdOEEg2cs= github.com/hjson/hjson-go/v4 v4.0.0/go.mod h1:KaYt3bTw3zhBjYqnXkYywcYctk0A2nxeEFTse3rH13E= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -417,12 +408,10 @@ github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFK github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= -github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= -github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= @@ -452,7 +441,6 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/knadh/koanf v1.5.0 h1:q2TSd/3Pyc/5yP9ldIrSdIz26MCcyNQzW0pEAugLPNs= @@ -464,8 +452,6 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -534,9 +520,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnuG+zWp9L0Uk= -github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= -github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= @@ -544,15 +527,6 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olivere/elastic v6.2.37+incompatible h1:UfSGJem5czY+x/LqxgeCBgjDn6St+z8OnsCuxwD3L0U= github.com/olivere/elastic v6.2.37+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8= -github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E= -github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= -github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= -github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak= -github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY= github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0 h1:OZPeakqoSZ1yRlmGBlWi9kISx/9PJzlNLGLutFPOQY0= github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.79.0/go.mod h1:VOHKYi1wm+/c2wZA3mY1Grd4eYP8uS//EV0yHBbGfGw= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.79.0 h1:litBmOODNZ5k8XE09FD3T1KOIASXqk7ktIyxyM9xLIs= @@ -572,8 +546,8 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= -github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -616,27 +590,23 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/cors v1.9.0 h1:l9HGsTsHJcvW14Nk7J9KFz8bzeAWXn3CG6bgt7LsrAE= github.com/rs/cors v1.9.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= -github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= -github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -688,7 +658,6 @@ github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= @@ -798,10 +767,8 @@ golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= @@ -849,7 +816,6 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -874,7 +840,6 @@ golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= @@ -889,9 +854,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= @@ -920,10 +883,10 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -940,14 +903,12 @@ golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -973,7 +934,6 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -989,7 +949,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1068,7 +1027,6 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -1202,16 +1160,12 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 243c120faeb8..3aa6ef4a3b76 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -15,11 +15,10 @@ package consumer import ( - "io" - "time" + "context" + "fmt" "github.com/Shopify/sarama" - cluster "github.com/bsm/sarama-cluster" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/kafka/auth" @@ -27,9 +26,9 @@ import ( // Consumer is an interface to features of Sarama that are necessary for the consumer type Consumer interface { - Partitions() <-chan cluster.PartitionConsumer - MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) - io.Closer + Consume(ctx context.Context, topics []string, handler sarama.ConsumerGroupHandler) error + Errors() <-chan error + Close() error } // Builder builds a new kafka consumer @@ -52,8 +51,7 @@ type Configuration struct { // NewConsumer creates a new kafka consumer func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) { - saramaConfig := cluster.NewConfig() - saramaConfig.Group.Mode = cluster.ConsumerModePartitions + saramaConfig := sarama.NewConfig() saramaConfig.ClientID = c.ClientID saramaConfig.RackID = c.RackID if len(c.ProtocolVersion) > 0 { @@ -61,15 +59,14 @@ func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) { if err != nil { return nil, err } - saramaConfig.Config.Version = ver + saramaConfig.Version = ver } - if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config, logger); err != nil { + if err := c.AuthenticationConfig.SetConfiguration(saramaConfig, logger); err != nil { return nil, err } - // cluster.NewConfig() uses sarama.NewConfig() to create the config. - // However the Jaeger OTEL module pulls in newer samara version (from OTEL collector) - // that does not set saramaConfig.Consumer.Offsets.CommitInterval to its default value 1s. - // then the samara-cluster fails if the default interval is not 1s. - saramaConfig.Consumer.Offsets.CommitInterval = time.Second - return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) + client, err := sarama.NewConsumerGroup(c.Brokers, c.GroupID, saramaConfig) + if err != nil { + return nil, fmt.Errorf("error creating consumer group client: %w", err) + } + return client, err } diff --git a/plugin/storage/integration/kafka_test.go b/plugin/storage/integration/kafka_test.go index 29f87a89545d..d008b1d0aced 100644 --- a/plugin/storage/integration/kafka_test.go +++ b/plugin/storage/integration/kafka_test.go @@ -99,6 +99,9 @@ func (s *KafkaIntegrationTestSuite) initialize() error { } spanConsumer.Start() + //wait consumer running + spanConsumer.Ready() + s.SpanWriter = spanWriter s.SpanReader = &ingester{traceStore} s.Refresh = func() error { return nil }