Skip to content

Commit

Permalink
ingest consumer: more granular error handling, committer sanity check (
Browse files Browse the repository at this point in the history
…#6951)

* ingest consumer: more granular error handling, committer sanity check

* check that the offset we're committing is certainly from the partition we're committing to
* process a fetch even when it contains some errors; this allows to process fetches with partial data

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Record total number of fetches

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Shortcut on 0 errors

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Work only on non-err fetches

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Dec 18, 2023
1 parent 4170afa commit 9de67c5
Showing 1 changed file with 47 additions and 12 deletions.
59 changes: 47 additions & 12 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package ingest

import (
"context"
"fmt"
"math"
"strconv"
"time"
Expand Down Expand Up @@ -116,31 +117,50 @@ func (r *PartitionReader) run(ctx context.Context) error {

for ctx.Err() == nil {
fetches := r.client.PollFetches(ctx)
if fetches.Err() != nil {
if errors.Is(fetches.Err(), context.Canceled) {
return nil
}
err := collectFetchErrs(fetches)
level.Error(r.logger).Log("msg", "encountered error while fetching", "err", err)
continue
}

r.recordFetchesMetrics(fetches)
r.logFetchErrs(fetches)
fetches = filterOutErrFetches(fetches)
r.consumeFetches(consumeCtx, fetches)
r.enqueueCommit(fetches)
}

return nil
}

func collectFetchErrs(fetches kgo.Fetches) (_ error) {
func filterOutErrFetches(fetches kgo.Fetches) kgo.Fetches {
filtered := make(kgo.Fetches, 0, len(fetches))
for i, fetch := range fetches {
if !isErrFetch(fetch) {
filtered = append(filtered, fetches[i])
}
}

return filtered
}

func isErrFetch(fetch kgo.Fetch) bool {
for _, t := range fetch.Topics {
for _, p := range t.Partitions {
if p.Err != nil {
return true
}
}
}
return false
}

func (r *PartitionReader) logFetchErrs(fetches kgo.Fetches) {
mErr := multierror.New()
fetches.EachError(func(s string, i int32, err error) {
// kgo advises to "restart" the kafka client if the returned error is a kerr.Error.
// Recreating the client would cause duplicate metrics registration, so we don't do it for now.
mErr.Add(err)
mErr.Add(fmt.Errorf("topic %q, partition %d: %w", s, i, err))
})
return mErr.Err()
if len(mErr) == 0 {
return
}
r.metrics.fetchesErrors.Add(float64(len(mErr)))
level.Error(r.logger).Log("msg", "encountered error while fetching", "err", mErr.Err())
}

func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) {
Expand All @@ -149,6 +169,10 @@ func (r *PartitionReader) enqueueCommit(fetches kgo.Fetches) {
}
lastOffset := int64(0)
fetches.EachPartition(func(partition kgo.FetchTopicPartition) {
if partition.Partition != r.partitionID {
level.Error(r.logger).Log("msg", "asked to commit wrong partition", "partition", partition.Partition, "expected_partition", r.partitionID)
return
}
lastOffset = partition.Records[len(partition.Records)-1].Offset
})
r.committer.enqueueOffset(lastOffset)
Expand Down Expand Up @@ -207,6 +231,7 @@ func (r *PartitionReader) recordFetchesMetrics(fetches kgo.Fetches) {
r.metrics.receiveDelay.Observe(now.Sub(record.Timestamp).Seconds())
})

r.metrics.fetchesTotal.Add(float64(len(fetches)))
r.metrics.recordsPerFetch.Observe(float64(numRecords))
}

Expand Down Expand Up @@ -342,6 +367,8 @@ func (r *partitionCommitter) run(ctx context.Context) error {
type readerMetrics struct {
receiveDelay prometheus.Summary
recordsPerFetch prometheus.Histogram
fetchesErrors prometheus.Counter
fetchesTotal prometheus.Counter
kprom *kprom.Metrics
}

Expand All @@ -361,6 +388,14 @@ func newReaderMetrics(partitionID int32, reg prometheus.Registerer) readerMetric
Help: "The number of records received by the consumer in a single fetch operation.",
Buckets: prometheus.ExponentialBuckets(1, 2, 15),
}),
fetchesErrors: factory.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_fetch_errors_total",
Help: "The number of fetch errors encountered by the consumer.",
}),
fetchesTotal: factory.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_fetches_total",
Help: "Total number of Kafka fetches received by the consumer.",
}),
kprom: kprom.NewMetrics("cortex_ingest_storage_reader",
kprom.Registerer(prometheus.WrapRegistererWith(prometheus.Labels{"partition": strconv.Itoa(int(partitionID))}, reg)),
// Do not export the client ID, because we use it to specify options to the backend.
Expand Down

0 comments on commit 9de67c5

Please sign in to comment.