Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: flush idle dataobjects after X seconds #16348

Merged
merged 6 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,11 @@ dataobj:
# CLI flag: -dataobj-consumer.sha-prefix-size
[shaprefixsize: <int> | default = 2]

# The maximum amount of time to wait in seconds before flushing an object
# that is no longer receiving new writes
# CLI flag: -dataobj-consumer.idle-flush-timeout
[idle_flush_timeout: <duration> | default = 1h]

querier:
# Enable the dataobj querier.
# CLI flag: -dataobj-querier-enabled
Expand Down
10 changes: 9 additions & 1 deletion pkg/dataobj/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package consumer

import (
"flag"
"time"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
)

type Config struct {
dataobj.BuilderConfig
UploaderConfig uploader.Config `yaml:"uploader"`
UploaderConfig uploader.Config `yaml:"uploader"`
IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"`
}

func (cfg *Config) Validate() error {
Expand All @@ -27,4 +29,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)

if cfg.IdleFlushTimeout <= 0 {
cfg.IdleFlushTimeout = 60 * 60 * time.Second // default to 1 hour
}

f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can set the default value as the 3rd argument in the flag definition, if you wish

}
92 changes: 77 additions & 15 deletions pkg/dataobj/consumer/partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type partitionProcessor struct {
bucket objstore.Bucket
bufPool *sync.Pool

// Idle stream handling
idleFlushTimout time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Timout -> Timeout

lastFlush time.Time

// Metrics
metrics *partitionOffsetMetrics

Expand All @@ -50,7 +54,21 @@ type partitionProcessor struct {
logger log.Logger
}

func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, uploaderCfg uploader.Config, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer, bufPool *sync.Pool) *partitionProcessor {
func newPartitionProcessor(
ctx context.Context,
client *kgo.Client,
builderCfg dataobj.BuilderConfig,
uploaderCfg uploader.Config,
bucket objstore.Bucket,
tenantID string,
virtualShard int32,
topic string,
partition int32,
logger log.Logger,
reg prometheus.Registerer,
bufPool *sync.Pool,
idleFlushTimeout time.Duration,
) *partitionProcessor {
ctx, cancel := context.WithCancel(ctx)
decoder, err := kafka.NewDecoder()
if err != nil {
Expand Down Expand Up @@ -95,6 +113,8 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d
uploader: uploader,
metastoreManager: metastoreManager,
bufPool: bufPool,
idleFlushTimout: idleFlushTimeout,
lastFlush: time.Now(),
}
}

Expand All @@ -115,6 +135,9 @@ func (p *partitionProcessor) start() {
return
}
p.processRecord(record)

case <-time.After(p.idleFlushTimout):
p.idleFlush()
}
}
}()
Expand Down Expand Up @@ -163,6 +186,29 @@ func (p *partitionProcessor) initBuilder() error {
return initErr
}

func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error {
flushedDataobjStats, err := p.builder.Flush(flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return err
}

objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return err
}

if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
return err
}

p.lastFlush = time.Now()

return nil
}

func (p *partitionProcessor) processRecord(record *kgo.Record) {
// Update offset metric at the end of processing
defer p.metrics.updateOffset(record.Offset)
Expand Down Expand Up @@ -200,20 +246,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) {

flushBuffer.Reset()

flushedDataobjStats, err := p.builder.Flush(flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return
}

objectPath, err := p.uploader.Upload(p.ctx, flushBuffer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return
}

if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil {
level.Error(p.logger).Log("msg", "failed to update metastore", "err", err)
if err := p.flushStream(flushBuffer); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}
}()
Expand Down Expand Up @@ -251,3 +285,31 @@ func (p *partitionProcessor) commitRecords(record *kgo.Record) error {
}
return lastErr
}

// idleFlush flushes the file if it has been idle for too long.
// This is used to avoid holding on to memory for too long.
// We compare the current time with the last flush time to determine if the builder has been idle.
func (p *partitionProcessor) idleFlush() {
if p.builder == nil {
return
}

now := time.Now()
if now.Sub(p.lastFlush) < p.idleFlushTimout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also use time.Since here: time.Since(p.lastFlush) < p.idleFlushTimeout
In practice it makes very little difference but its slightly easier to read

return // Avoid checking too frequently
}

func() {
flushBuffer := p.bufPool.Get().(*bytes.Buffer)
defer p.bufPool.Put(flushBuffer)

flushBuffer.Reset()

if err := p.flushStream(flushBuffer); err != nil {
level.Error(p.logger).Log("msg", "failed to flush stream", "err", err)
return
}

p.lastFlush = now
}()
}
2 changes: 1 addition & 1 deletion pkg/dataobj/consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie
}

for _, partition := range parts {
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool)
processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool, s.cfg.IdleFlushTimeout)
s.partitionHandlers[topic][partition] = processor
processor.start()
}
Expand Down