Skip to content

Commit

Permalink
add some comments
Browse files Browse the repository at this point in the history
Signed-off-by: axfor <[email protected]>
  • Loading branch information
axfor committed Mar 22, 2023
1 parent 170f77d commit f0a2d53
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {

return nil
}

Expand All @@ -158,6 +157,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
return nil
}

// handleMessages starting message handler
func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) {
c.logger.Info("Starting message handler", zap.Int32("partition", claim.Partition()))

Expand Down Expand Up @@ -207,11 +207,11 @@ func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sar
}
}
}

// closePartition close partition of consumer
func (c *Consumer) closePartition(claim sarama.ConsumerGroupClaim) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", claim.Partition()))
// claim.Close() // blocks until messages channel is drained
c.partitionMetrics(claim.Partition()).closeCounter.Inc(1)
c.logger.Info("Closed partition consumer", zap.Int32("partition", claim.Partition()))
}

// handleErrors handles incoming Kafka consumer errors on a channel
Expand Down

0 comments on commit f0a2d53

Please sign in to comment.