From ec4e958c613badbbecc825eba30fca306ac39396 Mon Sep 17 00:00:00 2001 From: axfor <aixiaoxiang2009@hotmail.com> Date: Sun, 19 Mar 2023 04:16:35 +0800 Subject: [PATCH] Remove the Consumer's partitionMapLock field and change the partitionsHeld type from int64 to atomic.int64 Signed-off-by: axfor <aixiaoxiang2009@hotmail.com> --- cmd/ingester/app/consumer/consumer.go | 18 ++++++------- cmd/ingester/app/consumer/consumer_test.go | 31 ---------------------- 2 files changed, 8 insertions(+), 41 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index a725704cbdcb..0c70b2957bf3 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -18,6 +18,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" "github.com/Shopify/sarama" @@ -47,8 +48,7 @@ type Consumer struct { deadlockDetector deadlockDetector - partitionMapLock sync.Mutex - partitionsHeld int64 + partitionsHeld atomic.Int64 partitionsHeldGauge metrics.Gauge doneWg sync.WaitGroup @@ -140,16 +140,14 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) { c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition())) - c.partitionMapLock.Lock() - c.partitionsHeld++ - c.partitionsHeldGauge.Update(c.partitionsHeld) - c.partitionMapLock.Unlock() + + c.partitionsHeld.Add(1) + c.partitionsHeldGauge.Update(c.partitionsHeld.Load()) + defer func() { c.closePartition(claim) - c.partitionMapLock.Lock() - c.partitionsHeld-- - c.partitionsHeldGauge.Update(c.partitionsHeld) - c.partitionMapLock.Unlock() + c.partitionsHeld.Add(-1) + c.partitionsHeldGauge.Update(c.partitionsHeld.Load()) c.doneWg.Done() }() diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 96ee07f9a6d7..c32f5ee4747b 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -17,12 +17,8 @@ package consumer import ( "testing" - "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" - "github.com/jaegertracing/jaeger/pkg/kafka/consumer" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" kmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" ) @@ -41,33 +37,6 @@ func TestConstructor(t *testing.T) { assert.NotNil(t, newConsumer) } -func newConsumer( - t *testing.T, - metricsFactory metrics.Factory, - topic string, - processor processor.SpanProcessor, - consumer consumer.Consumer, -) *Consumer { - logger, _ := zap.NewDevelopment() - consumerParams := Params{ - MetricsFactory: metricsFactory, - Logger: logger, - InternalConsumer: consumer, - ProcessorFactory: ProcessorFactory{ - topic: topic, - consumer: consumer, - metricsFactory: metricsFactory, - logger: logger, - baseProcessor: processor, - parallelism: 1, - }, - } - - c, err := New(consumerParams) - require.NoError(t, err) - return c -} - func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) { sc := &kmocks.Consumer{} metadata := "meatbag"