From 54ed0c6a278c4b1f480b2b0b57e3f00c69ae35e3 Mon Sep 17 00:00:00 2001 From: Jackson Coelho Date: Tue, 18 Feb 2025 15:27:49 +0100 Subject: [PATCH 1/6] chore: flush idle dataobjects after X seconds --- pkg/dataobj/builder.go | 9 ++ pkg/dataobj/consumer/partition_processor.go | 110 ++++++++++++++------ 2 files changed, 87 insertions(+), 32 deletions(-) diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go index 245488bc1874c..87f0f1df0cdde 100644 --- a/pkg/dataobj/builder.go +++ b/pkg/dataobj/builder.go @@ -52,6 +52,10 @@ type BuilderConfig struct { // BufferSize configures the size of the buffer used to accumulate // uncompressed logs in memory prior to sorting. BufferSize flagext.Bytes `yaml:"buffer_size"` + + // TargetIdleSeconds configures the maximum amount of time to wait in seconds + // before flushing the buffer to a data object. + TargetIdleSeconds time.Duration `yaml:"target_idle_seconds"` } // RegisterFlagsWithPrefix registers flags with the given prefix. @@ -65,6 +69,7 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") + f.DurationVar(&cfg.TargetIdleSeconds, prefix+"target-idle-seconds", 60*time.Second, "The maximum amount of time to wait in seconds before flushing the buffer to a data object.") } // Validate validates the BuilderConfig. @@ -89,6 +94,10 @@ func (cfg *BuilderConfig) Validate() error { errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) } + if cfg.TargetIdleSeconds <= 0 { + errs = append(errs, errors.New("TargetIdlePeriod must be greater than 0")) + } + return errors.Join(errs...) } diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index cf8345364b7ba..39ab80fd33dcd 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -39,6 +39,10 @@ type partitionProcessor struct { bucket objstore.Bucket bufPool *sync.Pool + // Idle stream handling + targetIdleSeconds time.Duration + lastFlush time.Time + // Metrics metrics *partitionOffsetMetrics @@ -79,22 +83,24 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d } return &partitionProcessor{ - client: client, - logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID), - topic: topic, - partition: partition, - records: make(chan *kgo.Record, 1000), - ctx: ctx, - cancel: cancel, - decoder: decoder, - reg: reg, - builderCfg: builderCfg, - bucket: bucket, - tenantID: []byte(tenantID), - metrics: metrics, - uploader: uploader, - metastoreManager: metastoreManager, - bufPool: bufPool, + client: client, + logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID), + topic: topic, + partition: partition, + records: make(chan *kgo.Record, 1000), + ctx: ctx, + cancel: cancel, + decoder: decoder, + reg: reg, + builderCfg: builderCfg, + bucket: bucket, + tenantID: []byte(tenantID), + metrics: metrics, + uploader: uploader, + metastoreManager: metastoreManager, + bufPool: bufPool, + targetIdleSeconds: builderCfg.TargetIdleSeconds, + lastFlush: time.Now(), } } @@ -115,6 +121,9 @@ func (p *partitionProcessor) start() { return } p.processRecord(record) + + case <-time.After(p.targetIdleSeconds): + p.idleFlush() } } }() @@ -163,6 +172,31 @@ func (p *partitionProcessor) initBuilder() error { return initErr } +func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error { + flushBuffer.Reset() + + flushedDataobjStats, err := p.builder.Flush(flushBuffer) + if err != nil { + level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) + return err + } + + objectPath, err := p.uploader.Upload(p.ctx, flushBuffer) + if err != nil { + level.Error(p.logger).Log("msg", "failed to upload object", "err", err) + return err + } + + if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil { + level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) + return err + } + + p.lastFlush = time.Now() + + return nil +} + func (p *partitionProcessor) processRecord(record *kgo.Record) { // Update offset metric at the end of processing defer p.metrics.updateOffset(record.Offset) @@ -198,22 +232,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { flushBuffer := p.bufPool.Get().(*bytes.Buffer) defer p.bufPool.Put(flushBuffer) - flushBuffer.Reset() - - flushedDataobjStats, err := p.builder.Flush(flushBuffer) - if err != nil { - level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) - return - } - - objectPath, err := p.uploader.Upload(p.ctx, flushBuffer) - if err != nil { - level.Error(p.logger).Log("msg", "failed to upload object", "err", err) - return - } - - if err := p.metastoreManager.UpdateMetastore(p.ctx, objectPath, flushedDataobjStats); err != nil { - level.Error(p.logger).Log("msg", "failed to update metastore", "err", err) + if err := p.flushStream(flushBuffer); err != nil { + level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) return } }() @@ -251,3 +271,29 @@ func (p *partitionProcessor) commitRecords(record *kgo.Record) error { } return lastErr } + +// idleFlush flushes the builder if it has been idle for too long. +// This is used to avoid holding on to memory for too long. +// We compare the current time with the last flush time to determine if the builder has been idle. +func (p *partitionProcessor) idleFlush() { + if p.builder == nil { + return + } + + now := time.Now() + if now.Sub(p.lastFlush) < p.targetIdleSeconds { + return // Avoid checking too frequently + } + + func() { + flushBuffer := p.bufPool.Get().(*bytes.Buffer) + defer p.bufPool.Put(flushBuffer) + + if err := p.flushStream(flushBuffer); err != nil { + level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) + return + } + + p.lastFlush = now + }() +} From cf4d834693edb2d6cdb8f601746035137f7dd055 Mon Sep 17 00:00:00 2001 From: Jackson Coelho Date: Tue, 18 Feb 2025 15:54:23 +0100 Subject: [PATCH 2/6] update the config --- pkg/dataobj/builder.go | 9 --- pkg/dataobj/consumer/config.go | 6 +- pkg/dataobj/consumer/partition_processor.go | 64 +++++++++++++-------- pkg/dataobj/consumer/service.go | 2 +- 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go index 87f0f1df0cdde..245488bc1874c 100644 --- a/pkg/dataobj/builder.go +++ b/pkg/dataobj/builder.go @@ -52,10 +52,6 @@ type BuilderConfig struct { // BufferSize configures the size of the buffer used to accumulate // uncompressed logs in memory prior to sorting. BufferSize flagext.Bytes `yaml:"buffer_size"` - - // TargetIdleSeconds configures the maximum amount of time to wait in seconds - // before flushing the buffer to a data object. - TargetIdleSeconds time.Duration `yaml:"target_idle_seconds"` } // RegisterFlagsWithPrefix registers flags with the given prefix. @@ -69,7 +65,6 @@ func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") - f.DurationVar(&cfg.TargetIdleSeconds, prefix+"target-idle-seconds", 60*time.Second, "The maximum amount of time to wait in seconds before flushing the buffer to a data object.") } // Validate validates the BuilderConfig. @@ -94,10 +89,6 @@ func (cfg *BuilderConfig) Validate() error { errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) } - if cfg.TargetIdleSeconds <= 0 { - errs = append(errs, errors.New("TargetIdlePeriod must be greater than 0")) - } - return errors.Join(errs...) } diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go index 3998e28d21d14..6c896242a7c34 100644 --- a/pkg/dataobj/consumer/config.go +++ b/pkg/dataobj/consumer/config.go @@ -2,6 +2,7 @@ package consumer import ( "flag" + "time" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/uploader" @@ -9,7 +10,8 @@ import ( type Config struct { dataobj.BuilderConfig - UploaderConfig uploader.Config `yaml:"uploader"` + UploaderConfig uploader.Config `yaml:"uploader"` + IdleFlushTimeout time.Duration `yaml:"idle_flush_timeout"` } func (cfg *Config) Validate() error { @@ -27,4 +29,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f) + + f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing the buffer to a data object.") } diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 39ab80fd33dcd..27c56525cc52c 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -40,8 +40,8 @@ type partitionProcessor struct { bufPool *sync.Pool // Idle stream handling - targetIdleSeconds time.Duration - lastFlush time.Time + idleFlushTimout time.Duration + lastFlush time.Time // Metrics metrics *partitionOffsetMetrics @@ -54,7 +54,21 @@ type partitionProcessor struct { logger log.Logger } -func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg dataobj.BuilderConfig, uploaderCfg uploader.Config, bucket objstore.Bucket, tenantID string, virtualShard int32, topic string, partition int32, logger log.Logger, reg prometheus.Registerer, bufPool *sync.Pool) *partitionProcessor { +func newPartitionProcessor( + ctx context.Context, + client *kgo.Client, + builderCfg dataobj.BuilderConfig, + uploaderCfg uploader.Config, + bucket objstore.Bucket, + tenantID string, + virtualShard int32, + topic string, + partition int32, + logger log.Logger, + reg prometheus.Registerer, + bufPool *sync.Pool, + idleFlushTimeout time.Duration, +) *partitionProcessor { ctx, cancel := context.WithCancel(ctx) decoder, err := kafka.NewDecoder() if err != nil { @@ -82,25 +96,29 @@ func newPartitionProcessor(ctx context.Context, client *kgo.Client, builderCfg d level.Error(logger).Log("msg", "failed to register metastore manager metrics", "err", err) } + if idleFlushTimeout <= 0 { + idleFlushTimeout = 60 * 60 * time.Second // default to 1 hour + } + return &partitionProcessor{ - client: client, - logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID), - topic: topic, - partition: partition, - records: make(chan *kgo.Record, 1000), - ctx: ctx, - cancel: cancel, - decoder: decoder, - reg: reg, - builderCfg: builderCfg, - bucket: bucket, - tenantID: []byte(tenantID), - metrics: metrics, - uploader: uploader, - metastoreManager: metastoreManager, - bufPool: bufPool, - targetIdleSeconds: builderCfg.TargetIdleSeconds, - lastFlush: time.Now(), + client: client, + logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID), + topic: topic, + partition: partition, + records: make(chan *kgo.Record, 1000), + ctx: ctx, + cancel: cancel, + decoder: decoder, + reg: reg, + builderCfg: builderCfg, + bucket: bucket, + tenantID: []byte(tenantID), + metrics: metrics, + uploader: uploader, + metastoreManager: metastoreManager, + bufPool: bufPool, + idleFlushTimout: idleFlushTimeout, + lastFlush: time.Now(), } } @@ -122,7 +140,7 @@ func (p *partitionProcessor) start() { } p.processRecord(record) - case <-time.After(p.targetIdleSeconds): + case <-time.After(p.idleFlushTimout): p.idleFlush() } } @@ -281,7 +299,7 @@ func (p *partitionProcessor) idleFlush() { } now := time.Now() - if now.Sub(p.lastFlush) < p.targetIdleSeconds { + if now.Sub(p.lastFlush) < p.idleFlushTimout { return // Avoid checking too frequently } diff --git a/pkg/dataobj/consumer/service.go b/pkg/dataobj/consumer/service.go index 1c36bf2057040..08769051437a1 100644 --- a/pkg/dataobj/consumer/service.go +++ b/pkg/dataobj/consumer/service.go @@ -100,7 +100,7 @@ func (s *Service) handlePartitionsAssigned(ctx context.Context, client *kgo.Clie } for _, partition := range parts { - processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool) + processor := newPartitionProcessor(ctx, client, s.cfg.BuilderConfig, s.cfg.UploaderConfig, s.bucket, tenant, virtualShard, topic, partition, s.logger, s.reg, s.bufPool, s.cfg.IdleFlushTimeout) s.partitionHandlers[topic][partition] = processor processor.start() } From 730b2e6d2bf08bbf9d09a0416f9464875a0669ea Mon Sep 17 00:00:00 2001 From: Jackson Coelho Date: Tue, 18 Feb 2025 16:03:28 +0100 Subject: [PATCH 3/6] update docs --- docs/sources/shared/configuration.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d75d6a4307ae8..3e82740b06b3b 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -847,6 +847,11 @@ dataobj: # CLI flag: -dataobj-consumer.sha-prefix-size [shaprefixsize: | default = 2] + # The maximum amount of time to wait in seconds before flushing the buffer + # to a data object. + # CLI flag: -dataobj-consumer.idle-flush-timeout + [idle_flush_timeout: | default = 0s] + querier: # Enable the dataobj querier. # CLI flag: -dataobj-querier-enabled From 933e13911185e32c36ba944743f64f38c8141d87 Mon Sep 17 00:00:00 2001 From: Jackson Coelho Date: Tue, 18 Feb 2025 16:08:00 +0100 Subject: [PATCH 4/6] update comments --- pkg/dataobj/consumer/partition_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 27c56525cc52c..39bfaa60d00db 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -290,7 +290,7 @@ func (p *partitionProcessor) commitRecords(record *kgo.Record) error { return lastErr } -// idleFlush flushes the builder if it has been idle for too long. +// idleFlush flushes the file if it has been idle for too long. // This is used to avoid holding on to memory for too long. // We compare the current time with the last flush time to determine if the builder has been idle. func (p *partitionProcessor) idleFlush() { From bb907698cb1f0c2dfa5354775ccac9b32d4aae19 Mon Sep 17 00:00:00 2001 From: Jackson Coelho Date: Tue, 18 Feb 2025 17:06:24 +0100 Subject: [PATCH 5/6] fixes --- pkg/dataobj/consumer/config.go | 4 ++++ pkg/dataobj/consumer/partition_processor.go | 10 ++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go index 6c896242a7c34..79f83c29de1f8 100644 --- a/pkg/dataobj/consumer/config.go +++ b/pkg/dataobj/consumer/config.go @@ -30,5 +30,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f) cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f) + if cfg.IdleFlushTimeout <= 0 { + cfg.IdleFlushTimeout = 60 * 60 * time.Second // default to 1 hour + } + f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing the buffer to a data object.") } diff --git a/pkg/dataobj/consumer/partition_processor.go b/pkg/dataobj/consumer/partition_processor.go index 39bfaa60d00db..b9030b354ed85 100644 --- a/pkg/dataobj/consumer/partition_processor.go +++ b/pkg/dataobj/consumer/partition_processor.go @@ -96,10 +96,6 @@ func newPartitionProcessor( level.Error(logger).Log("msg", "failed to register metastore manager metrics", "err", err) } - if idleFlushTimeout <= 0 { - idleFlushTimeout = 60 * 60 * time.Second // default to 1 hour - } - return &partitionProcessor{ client: client, logger: log.With(logger, "topic", topic, "partition", partition, "tenant", tenantID), @@ -191,8 +187,6 @@ func (p *partitionProcessor) initBuilder() error { } func (p *partitionProcessor) flushStream(flushBuffer *bytes.Buffer) error { - flushBuffer.Reset() - flushedDataobjStats, err := p.builder.Flush(flushBuffer) if err != nil { level.Error(p.logger).Log("msg", "failed to flush builder", "err", err) @@ -250,6 +244,8 @@ func (p *partitionProcessor) processRecord(record *kgo.Record) { flushBuffer := p.bufPool.Get().(*bytes.Buffer) defer p.bufPool.Put(flushBuffer) + flushBuffer.Reset() + if err := p.flushStream(flushBuffer); err != nil { level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) return @@ -307,6 +303,8 @@ func (p *partitionProcessor) idleFlush() { flushBuffer := p.bufPool.Get().(*bytes.Buffer) defer p.bufPool.Put(flushBuffer) + flushBuffer.Reset() + if err := p.flushStream(flushBuffer); err != nil { level.Error(p.logger).Log("msg", "failed to flush stream", "err", err) return From 8ac6adca4b4afb3f199aaaeea14941f1e6dd9d6f Mon Sep 17 00:00:00 2001 From: Jackson Coelho Date: Tue, 18 Feb 2025 17:14:41 +0100 Subject: [PATCH 6/6] update flag description --- docs/sources/shared/configuration.md | 6 +++--- pkg/dataobj/consumer/config.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 3e82740b06b3b..4cb8c9b108f3e 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -847,10 +847,10 @@ dataobj: # CLI flag: -dataobj-consumer.sha-prefix-size [shaprefixsize: | default = 2] - # The maximum amount of time to wait in seconds before flushing the buffer - # to a data object. + # The maximum amount of time to wait in seconds before flushing an object + # that is no longer receiving new writes # CLI flag: -dataobj-consumer.idle-flush-timeout - [idle_flush_timeout: | default = 0s] + [idle_flush_timeout: | default = 1h] querier: # Enable the dataobj querier. diff --git a/pkg/dataobj/consumer/config.go b/pkg/dataobj/consumer/config.go index 79f83c29de1f8..b76f9b6672d12 100644 --- a/pkg/dataobj/consumer/config.go +++ b/pkg/dataobj/consumer/config.go @@ -34,5 +34,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.IdleFlushTimeout = 60 * 60 * time.Second // default to 1 hour } - f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing the buffer to a data object.") + f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes") }