Skip to content

Commit

Permalink
Remove the Consumer's partitionMapLock field and change the partition…
Browse files Browse the repository at this point in the history
…sHeld type from int64 to atomic.int64

Signed-off-by: axfor <[email protected]>
  • Loading branch information
axfor committed Mar 18, 2023
1 parent 1556879 commit ec4e958
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 41 deletions.
18 changes: 8 additions & 10 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -47,8 +48,7 @@ type Consumer struct {

deadlockDetector deadlockDetector

partitionMapLock sync.Mutex
partitionsHeld int64
partitionsHeld atomic.Int64
partitionsHeldGauge metrics.Gauge

doneWg sync.WaitGroup
Expand Down Expand Up @@ -140,16 +140,14 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram

func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) {
c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition()))
c.partitionMapLock.Lock()
c.partitionsHeld++
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()

c.partitionsHeld.Add(1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())

defer func() {
c.closePartition(claim)
c.partitionMapLock.Lock()
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
c.partitionMapLock.Unlock()
c.partitionsHeld.Add(-1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())
c.doneWg.Done()
}()

Expand Down
31 changes: 0 additions & 31 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ package consumer
import (
"testing"

"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
"github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
)
Expand All @@ -41,33 +37,6 @@ func TestConstructor(t *testing.T) {
assert.NotNil(t, newConsumer)
}

func newConsumer(
t *testing.T,
metricsFactory metrics.Factory,
topic string,
processor processor.SpanProcessor,
consumer consumer.Consumer,
) *Consumer {
logger, _ := zap.NewDevelopment()
consumerParams := Params{
MetricsFactory: metricsFactory,
Logger: logger,
InternalConsumer: consumer,
ProcessorFactory: ProcessorFactory{
topic: topic,
consumer: consumer,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: processor,
parallelism: 1,
},
}

c, err := New(consumerParams)
require.NoError(t, err)
return c
}

func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) {
sc := &kmocks.Consumer{}
metadata := "meatbag"
Expand Down

0 comments on commit ec4e958

Please sign in to comment.