Skip to content

Commit

Permalink
add func ready of consumer and remove commitInterval config
Browse files Browse the repository at this point in the history
Signed-off-by: axfor <[email protected]>
  • Loading branch information
axfor committed Mar 22, 2023
1 parent ec4e958 commit 810629b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
14 changes: 10 additions & 4 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Consumer struct {
partitionsHeldGauge metrics.Gauge

doneWg sync.WaitGroup
ready chan struct{}

topic string
cancel context.CancelFunc
Expand All @@ -67,6 +68,7 @@ func New(params Params) (*Consumer, error) {
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
ready: make(chan struct{}),
topic: params.ProcessorFactory.topic,
}, nil
}
Expand All @@ -78,25 +80,22 @@ func (c *Consumer) Start() {
c.cancel = cancel
c.doneWg.Add(1)

var fristStart sync.WaitGroup
fristStart.Add(1)
go func() {
fristStart.Done()
defer c.doneWg.Done()
for {
select {
case <-ctx.Done():
c.logger.Error("ctx canceld")
return
default:
c.ready = make(chan struct{})
c.logger.Info("Topic", zap.Strings("topic", strings.Split(c.topic, ",")))
if err := c.internalConsumer.Consume(ctx, strings.Split(c.topic, ","), c); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}
}
}
}()
fristStart.Wait()
}

// Close closes the Consumer and underlying sarama consumer
Expand All @@ -117,14 +116,21 @@ func (c *Consumer) Close() error {
return err
}

// Ready is consumer running
func (c *Consumer) Ready() {
<-c.ready
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {

return nil
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package consumer

import (
"fmt"
"time"

"github.com/Shopify/sarama"
"go.uber.org/zap"
Expand Down Expand Up @@ -62,11 +61,6 @@ func (c *Configuration) NewConsumer(logger *zap.Logger) (Consumer, error) {
if err := c.AuthenticationConfig.SetConfiguration(saramaConfig, logger); err != nil {
return nil, err
}
// cluster.NewConfig() uses sarama.NewConfig() to create the config.
// However the Jaeger OTEL module pulls in newer samara version (from OTEL collector)
// that does not set saramaConfig.Consumer.Offsets.CommitInterval to its default value 1s.
// then the samara-cluster fails if the default interval is not 1s.
saramaConfig.Consumer.Offsets.CommitInterval = time.Second
client, err := sarama.NewConsumerGroup(c.Brokers, c.GroupID, saramaConfig)
if err != nil {
return nil, fmt.Errorf("error creating consumer group client: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
spanConsumer.Start()

//wait consumer running
time.Sleep(time.Second * 5)
spanConsumer.Ready()

s.SpanWriter = spanWriter
s.SpanReader = &ingester{traceStore}
Expand Down

0 comments on commit 810629b

Please sign in to comment.