Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit cfef09c
Merge: a5cbf94 f8d219a
Author: axfor <[email protected]>
Date:   Sun Mar 26 14:24:42 2023 +0800

    Merge branch 'upgrade-sarama-v1.38.1' into upgrade-sarama-v1.38.1

commit a5cbf94
Merge: c90f070 13fa044
Author: axfor <[email protected]>
Date:   Sun Mar 26 14:23:35 2023 +0800

    Merge branch 'main' into  upgrade-sarama-v1.38.1

commit f8d219a
Merge: 46f6368 13fa044
Author: axx <[email protected]>
Date:   Sun Mar 26 14:23:11 2023 +0800

    Merge branch 'main' into upgrade-sarama-v1.38.1

commit 46f6368
Merge: 6d75d03 4f231b6
Author: axx <[email protected]>
Date:   Sat Mar 25 15:29:51 2023 +0800

    Merge branch 'main' into upgrade-sarama-v1.38.1

commit 6d75d03
Merge: c90f070 45d2de2
Author: axx <[email protected]>
Date:   Thu Mar 23 18:39:50 2023 +0800

    Merge branch 'main' into upgrade-sarama-v1.38.1

commit c90f070
Author: axfor <[email protected]>
Date:   Thu Mar 23 01:10:54 2023 +0800

    fix unnecessary assignment to the blank identifier (gosimple)

    Signed-off-by: axfor <[email protected]>

commit 90d74dd
Author: axfor <[email protected]>
Date:   Thu Mar 23 00:16:07 2023 +0800

    fix some tips

    Signed-off-by: axfor <[email protected]>

commit f0a2d53
Author: axfor <[email protected]>
Date:   Wed Mar 22 23:55:29 2023 +0800

    add some comments

    Signed-off-by: axfor <[email protected]>

commit 170f77d
Author: axfor <[email protected]>
Date:   Wed Mar 22 23:27:55 2023 +0800

    Fixed the issue of repeatedly closing ready

    Signed-off-by: axfor <[email protected]>

commit e74b66b
Author: axfor <[email protected]>
Date:   Wed Mar 22 22:07:34 2023 +0800

    The consumer interface is changed to a minimum subset

    Signed-off-by: axfor <[email protected]>

commit f8863b5
Merge: 2703cb9 6ab3f01
Author: axfor <[email protected]>
Date:   Wed Mar 22 21:38:09 2023 +0800

    Merge branch 'main'  into upgrade-sarama-v1.38.1

    Signed-off-by: axfor <[email protected]>

commit 2703cb9
Merge: 810629b 66e6610
Author: axfor <[email protected]>
Date:   Wed Mar 22 21:30:14 2023 +0800

    Merge branch 'upgrade-sarama-v1.38.1'  into upgrade-sarama-v1.38.1

commit 810629b
Author: axfor <[email protected]>
Date:   Wed Mar 22 21:29:40 2023 +0800

    add func ready of consumer and remove commitInterval config

    Signed-off-by: axfor <[email protected]>

commit 66e6610
Merge: ec4e958 06c59e7
Author: axx <[email protected]>
Date:   Sun Mar 19 16:29:14 2023 +0800

    Merge branch 'main' into upgrade-sarama-v1.38.1

commit ec4e958
Author: axfor <[email protected]>
Date:   Sun Mar 19 04:16:35 2023 +0800

    Remove the Consumer's partitionMapLock field and change the partitionsHeld type from int64 to atomic.int64

    Signed-off-by: axfor <[email protected]>

commit 1556879
Author: axfor <[email protected]>
Date:   Sun Mar 19 02:07:00 2023 +0800

    fixed some typoe

    Signed-off-by: axfor <[email protected]>

commit 28dc496
Merge: 7294b58 c46d6ca
Author: axfor <[email protected]>
Date:   Sun Mar 19 01:58:04 2023 +0800

    Merge branch 'upgrade-sarama-v1.38.1'  into upgrade-sarama-v1.38.1

commit 7294b58
Author: axfor <[email protected]>
Date:   Sun Mar 19 01:56:53 2023 +0800

    fixed some err of integration test

    Signed-off-by: axfor <[email protected]>

commit c46d6ca
Merge: 90ddffc 43c29a0
Author: axx <[email protected]>
Date:   Sat Mar 18 22:15:21 2023 +0800

    Merge branch 'main' into upgrade-sarama-v1.38.1

commit 90ddffc
Author: axfor <[email protected]>
Date:   Sat Mar 18 22:11:12 2023 +0800

    Improved log printing when a new ConsumerGroup object fails

    Signed-off-by: axfor <[email protected]>

commit 655a6dd
Merge: 3a82b7c 18843de
Author: axfor <[email protected]>
Date:   Sun Mar 12 17:13:54 2023 +0800

    Merge branch 'main' code

    Signed-off-by: axfor <[email protected]>

commit 3a82b7c
Author: axfor <[email protected]>
Date:   Sun Mar 12 17:09:42 2023 +0800

    update some comment

    Signed-off-by: axfor <[email protected]>

commit e077b09
Author: axfor <[email protected]>
Date:   Sun Mar 12 16:43:57 2023 +0800

    improve some of the tests

    Signed-off-by: axfor <[email protected]>

commit 574810a
Author: Clement <[email protected]>
Date:   Sun Mar 12 03:02:00 2023 +0800

    [chore]: Replace pkg/multierror with standard errors.Join (jaegertracing#4293)
    Resolves jaegertracing#4292

    Signed-off-by: Clement <[email protected]>
    Signed-off-by: axfor <[email protected]>

commit b7ab7fd
Author: axfor <[email protected]>
Date:   Sun Mar 12 00:06:22 2023 +0800

    Bump github.com/Shopify/sarama from v1.33.0 to v1.38.1

    Signed-off-by: axfor <[email protected]>
  • Loading branch information
axfor committed Jun 23, 2023
1 parent 3ed0928 commit 5c34f4d
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 392 deletions.
146 changes: 99 additions & 47 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package consumer

import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
sc "github.com/bsm/sarama-cluster"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
Expand All @@ -46,16 +48,15 @@ type Consumer struct {

deadlockDetector deadlockDetector

partitionIDToState map[int32]*consumerState
partitionMapLock sync.Mutex
partitionsHeld int64
partitionsHeld atomic.Int64
partitionsHeldGauge metrics.Gauge

doneWg sync.WaitGroup
}
doneWg sync.WaitGroup
ready chan struct{}
readyCloseLock sync.Mutex

type consumerState struct {
partitionConsumer sc.PartitionConsumer
topic string
cancel context.CancelFunc
}

// New is a constructor for a Consumer
Expand All @@ -67,25 +68,37 @@ func New(params Params) (*Consumer, error) {
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
ready: make(chan struct{}),
topic: params.ProcessorFactory.topic,
}, nil
}

// Start begins consuming messages in a go routine
func (c *Consumer) Start() {
c.deadlockDetector.start()
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
c.doneWg.Add(1)

go func() {
c.logger.Info("Starting main loop")
for pc := range c.internalConsumer.Partitions() {
c.partitionMapLock.Lock()
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
c.partitionMapLock.Unlock()
c.partitionMetrics(pc.Partition()).startCounter.Inc(1)

c.doneWg.Add(2)
go c.handleMessages(pc)
go c.handleErrors(pc.Partition(), pc.Errors())
defer c.doneWg.Done()
for {
select {
case <-ctx.Done():
c.logger.Error("ctx canceld")
return
default:
c.logger.Info("Topic", zap.Strings("topic", strings.Split(c.topic, ",")))
if err := c.internalConsumer.Consume(ctx, strings.Split(c.topic, ","), c); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
c.ready = make(chan struct{})
}
}
}()
}
Expand All @@ -98,51 +111,85 @@ func (c *Consumer) Close() error {

c.logger.Debug("Closing deadlock detector")
c.deadlockDetector.close()
if c.cancel != nil {
c.cancel()
}

c.logger.Debug("Waiting for messages and errors to be handled")
c.doneWg.Wait()

return err
}

// handleMessages handles incoming Kafka messages on a channel
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionMapLock.Lock()
c.partitionsHeld++
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
// Ready is consumer running
func (c *Consumer) Ready() {
<-c.ready
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready

c.readyCloseLock.Lock()
defer c.readyCloseLock.Unlock()

select {
case <-c.ready:
default:
close(c.ready)
}

return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.partitionMetrics(claim.Partition()).startCounter.Inc(1)

c.doneWg.Add(2)
go c.handleErrors(claim.Partition(), c.internalConsumer.Errors())
c.handleMessages(session, claim)
return nil
}

// handleMessages starting message handler
func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) {
c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition()))

c.partitionsHeld.Add(1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())

defer func() {
c.closePartition(pc)
c.partitionMapLock.Lock()
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
c.closePartition(claim)
c.partitionsHeld.Add(-1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())
c.doneWg.Done()
}()

msgMetrics := c.newMsgMetrics(pc.Partition())
msgMetrics := c.newMsgMetrics(claim.Partition())

var msgProcessor processor.SpanProcessor

deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition())
deadlockDetector := c.deadlockDetector.startMonitoringForPartition(claim.Partition())
defer deadlockDetector.close()

for {
select {
case msg, ok := <-pc.Messages():
if !ok {
c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition()))
return
}
case msg := <-claim.Messages():

c.logger.Debug("Got msg", zap.Any("msg", msg))
msgMetrics.counter.Inc(1)
msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
msgMetrics.lagGauge.Update(claim.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()

if msgProcessor == nil {
msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1)
msgProcessor = c.processorFactory.new(session, claim, msg.Offset-1)
defer msgProcessor.Close()
}

Expand All @@ -151,22 +198,27 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Error("Failed to process a Kafka message", zap.Error(err), zap.Int32("partition", msg.Partition), zap.Int64("offset", msg.Offset))
}

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/Shopify/sarama/issues/1192
case <-session.Context().Done():
c.logger.Info("Session done", zap.Int32("partition", claim.Partition()))
return
case <-deadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition()))
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", claim.Partition()))
return
}
}
}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
// closePartition close partition of consumer
func (c *Consumer) closePartition(claim sarama.ConsumerGroupClaim) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", claim.Partition()))
c.partitionMetrics(claim.Partition()).closeCounter.Inc(1)
}

// handleErrors handles incoming Kafka consumer errors on a channel
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
func (c *Consumer) handleErrors(partition int32, errChan <-chan error) {
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
defer c.doneWg.Done()

Expand Down
Loading

0 comments on commit 5c34f4d

Please sign in to comment.