From f0a2d53fbade1b40a60f6d76dbb25d3f0e19146c Mon Sep 17 00:00:00 2001 From: axfor Date: Wed, 22 Mar 2023 23:55:29 +0800 Subject: [PATCH] add some comments Signed-off-by: axfor --- cmd/ingester/app/consumer/consumer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index bff9a2a20ac0..2cfdf364d927 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -144,7 +144,6 @@ func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { - return nil } @@ -158,6 +157,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram return nil } +// handleMessages starting message handler func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) { c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition())) @@ -207,11 +207,11 @@ func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sar } } } + +// closePartition close partition of consumer func (c *Consumer) closePartition(claim sarama.ConsumerGroupClaim) { c.logger.Info("Closing partition consumer", zap.Int32("partition", claim.Partition())) - // claim.Close() // blocks until messages channel is drained c.partitionMetrics(claim.Partition()).closeCounter.Inc(1) - c.logger.Info("Closed partition consumer", zap.Int32("partition", claim.Partition())) } // handleErrors handles incoming Kafka consumer errors on a channel