Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(block-builder): return from Process call early if max offset is reached #15073

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions pkg/blockbuilder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/dskit/backoff"
Expand Down Expand Up @@ -68,11 +70,13 @@ type PartitionJobController struct {
part partition.Reader
backoff backoff.Config
decoder *kafka.Decoder
logger log.Logger
}

func NewPartitionJobController(
controller partition.Reader,
backoff backoff.Config,
logger log.Logger,
) (*PartitionJobController, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
Expand All @@ -83,6 +87,11 @@ func NewPartitionJobController(
part: controller,
backoff: backoff,
decoder: decoder,
logger: log.With(logger,
"component", "job-controller",
"topic", controller.Topic(),
"partition", controller.Partition(),
),
}, nil
}

Expand Down Expand Up @@ -125,9 +134,9 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
err error
)

for boff.Ongoing() {
for lastOffset < offsets.Max && boff.Ongoing() {
var records []partition.Record
records, err = l.part.Poll(ctx)
records, err = l.part.Poll(ctx, int(offsets.Max-lastOffset))
if err != nil {
boff.Wait()
continue
Expand All @@ -143,11 +152,11 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c

converted := make([]AppendInput, 0, len(records))
for _, record := range records {
offset := records[len(records)-1].Offset
if offset >= offsets.Max {
if record.Offset >= offsets.Max {
level.Debug(l.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max)
break
}
lastOffset = offset
lastOffset = record.Offset

stream, labels, err := l.decoder.Decode(record.Content)
if err != nil {
Expand All @@ -163,7 +172,9 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c
labelsStr: stream.Labels,
entries: stream.Entries,
})
}

if len(converted) > 0 {
select {
case ch <- converted:
case <-ctx.Done():
Expand Down Expand Up @@ -198,7 +209,14 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error)
if err != nil {
return false, Job{}, err
}

if highestOffset < committedOffset {
level.Error(l.logger).Log("msg", "partition highest offset is less than committed offset", "highest", highestOffset, "committed", committedOffset)
return false, Job{}, fmt.Errorf("partition highest offset is less than committed offset")
}

if highestOffset == committedOffset {
level.Info(l.logger).Log("msg", "no pending records to process")
return false, Job{}, nil
}

Expand Down
39 changes: 26 additions & 13 deletions pkg/blockbuilder/slimgester.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (i *BlockBuilder) running(ctx context.Context) error {
default:
_, err := i.runOne(ctx)
if err != nil {
return err
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}
}

Expand All @@ -157,7 +157,7 @@ func (i *BlockBuilder) running(ctx context.Context) error {
"err", err,
)
if err != nil {
return err
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}
}
}
Expand Down Expand Up @@ -213,6 +213,8 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
level.Debug(logger).Log(
"msg", "finished loading records",
"ctx_error", ctx.Err(),
"last_offset", lastOffset,
"total_records", lastOffset-job.Offsets.Min,
)
close(inputCh)
return nil
Expand Down Expand Up @@ -305,6 +307,7 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
func() (res struct{}, err error) {
err = i.store.PutOne(ctx, chk.From, chk.Through, *chk)
if err != nil {
level.Error(logger).Log("msg", "failed to flush chunk", "err", err)
i.metrics.chunksFlushFailures.Inc()
return
}
Expand All @@ -320,6 +323,10 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
Entries: uint32(chk.Data.Entries()),
}
err = indexer.Append(chk.UserID, chk.Metric, chk.ChunkRef.Fingerprint, index.ChunkMetas{meta})
if err != nil {
level.Error(logger).Log("msg", "failed to append chunk to index", "err", err)
}

return
},
); err != nil {
Expand All @@ -346,24 +353,30 @@ func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {

built, err := indexer.create(ctx, nodeName, tableRanges)
if err != nil {
level.Error(logger).Log("msg", "failed to build index", "err", err)
return false, err
}

u := newUploader(i.objStore)
for _, db := range built {
u := newUploader(i.objStore)
if err := u.Put(ctx, db); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to upload tsdb",
"path", db.id.Path(),
)
if _, err := withBackoff(ctx, i.cfg.Backoff, func() (res struct{}, err error) {
err = u.Put(ctx, db)
if err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to upload tsdb",
"path", db.id.Path(),
)
return
}

level.Debug(logger).Log(
"msg", "uploaded tsdb",
"name", db.id.Name(),
)
return
}); err != nil {
return false, err
}

level.Debug(logger).Log(
"msg", "uploaded tsdb",
"name", db.id.Name(),
)
}

if lastOffset <= job.Offsets.Min {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kafka/partition/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Reader interface {
ConsumerGroup() string
FetchLastCommittedOffset(ctx context.Context) (int64, error)
FetchPartitionOffset(ctx context.Context, position SpecialOffset) (int64, error)
Poll(ctx context.Context) ([]Record, error)
Poll(ctx context.Context, maxPollRecords int) ([]Record, error)
Commit(ctx context.Context, offset int64) error
// Set the target offset for consumption. reads will begin from here.
SetOffsetForConsumption(offset int64)
Expand Down Expand Up @@ -257,9 +257,10 @@ func (r *StdReader) FetchPartitionOffset(ctx context.Context, position SpecialOf
}

// Poll retrieves the next batch of records from Kafka
func (r *StdReader) Poll(ctx context.Context) ([]Record, error) {
// Number of records fetched can be limited by configuring maxPollRecords to a non-zero value.
func (r *StdReader) Poll(ctx context.Context, maxPollRecords int) ([]Record, error) {
start := time.Now()
fetches := r.client.PollFetches(ctx)
fetches := r.client.PollRecords(ctx, maxPollRecords)
r.metrics.fetchWaitDuration.Observe(time.Since(start).Seconds())

// Record metrics
Expand Down
4 changes: 2 additions & 2 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (s *ReaderService) processNextFetchesUntilLagHonored(ctx context.Context, m
}

timedCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
records, err := s.reader.Poll(timedCtx)
records, err := s.reader.Poll(timedCtx, -1)
cancel()

if err != nil {
Expand Down Expand Up @@ -382,7 +382,7 @@ func (s *ReaderService) startFetchLoop(ctx context.Context) chan []Record {
case <-ctx.Done():
return
default:
res, err := s.reader.Poll(ctx)
res, err := s.reader.Poll(ctx, -1)
if err != nil {
level.Error(s.logger).Log("msg", "error polling records", "err", err)
continue
Expand Down
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,6 +1822,7 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
controller, err := blockbuilder.NewPartitionJobController(
reader,
t.Cfg.BlockBuilder.Backoff,
logger,
)

if err != nil {
Expand Down
Loading