diff --git a/pkg/dataobj/builder.go b/pkg/dataobj/builder.go new file mode 100644 index 0000000000000..ba0391b9298f2 --- /dev/null +++ b/pkg/dataobj/builder.go @@ -0,0 +1,363 @@ +package dataobj + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "flag" + "fmt" + + "github.com/grafana/dskit/flagext" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" + "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs" + "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" +) + +// ErrBufferFull is returned by [Builder.Append] when the buffer is full and +// needs to flush; call [Builder.Flush] to flush it. +var ErrBufferFull = errors.New("buffer full") + +// BuilderConfig configures a data object [Builder]. +type BuilderConfig struct { + // SHAPrefixSize sets the number of bytes of the SHA filename to use as a + // folder path. + SHAPrefixSize int `yaml:"sha_prefix_size"` + + // TargetPageSize configures a target size for encoded pages within the data + // object. TargetPageSize accounts for encoding, but not for compression. + TargetPageSize flagext.Bytes `yaml:"target_page_size"` + + // TODO(rfratto): We need an additional parameter for TargetMetadataSize, as + // metadata payloads can't be split and must be downloaded in a single + // request. + // + // At the moment, we don't have a good mechanism for implementing a metadata + // size limit (we need to support some form of section splitting or column + // combinations), so the option is omitted for now. + + // TargetObjectSize configures a target size for data objects. + TargetObjectSize flagext.Bytes `yaml:"target_object_size"` + + // TargetSectionSize configures the maximum size of data in a section. Sections + // which support this parameter will place overflow data into new sections of + // the same type. + TargetSectionSize flagext.Bytes `yaml:"target_section_size"` + + // BufferSize configures the size of the buffer used to accumulate + // uncompressed logs in memory prior to sorting. + BufferSize flagext.Bytes `yaml:"buffer_size"` +} + +// RegisterFlagsWithPrefix registers flags with the given prefix. +func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + _ = cfg.TargetPageSize.Set("2MB") + _ = cfg.TargetObjectSize.Set("1GB") + _ = cfg.BufferSize.Set("16MB") // Page Size * 8 + _ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8 + + f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.") + f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.") + f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") + f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") + f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") +} + +// Validate validates the BuilderConfig. +func (cfg *BuilderConfig) Validate() error { + var errs []error + + if cfg.SHAPrefixSize <= 0 { + errs = append(errs, errors.New("SHAPrefixSize must be greater than 0")) + } + + if cfg.TargetPageSize <= 0 { + errs = append(errs, errors.New("TargetPageSize must be greater than 0")) + } else if cfg.TargetPageSize >= cfg.TargetObjectSize { + errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize")) + } + + if cfg.TargetObjectSize <= 0 { + errs = append(errs, errors.New("TargetObjectSize must be greater than 0")) + } + + if cfg.BufferSize <= 0 { + errs = append(errs, errors.New("BufferSize must be greater than 0")) + } + + if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize { + errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) + } + + return errors.Join(errs...) +} + +// A Builder builds data objects from a set of incoming log data. Log data is +// appended to a builder by calling [Builder.Append]. Buffered log data is +// flushed manually by calling [Builder.Flush]. +// +// Methods on Builder are not goroutine-safe; callers are responsible for +// synchronizing calls. +type Builder struct { + cfg BuilderConfig + metrics *metrics + bucket objstore.Bucket + tenantID string + + labelCache *lru.Cache[string, labels.Labels] + + currentSizeEstimate int + state builderState + + streams *streams.Streams + logs *logs.Logs + + flushBuffer *bytes.Buffer + encoder *encoding.Encoder +} + +type builderState int + +const ( + // builderStateReady indicates the builder is empty and ready to accept new data. + builderStateEmpty builderState = iota + + // builderStateDirty indicates the builder has been modified since the last flush. + builderStateDirty + + // builderStateFlushing indicates the builder has data to flush. + builderStateFlush +) + +// NewBuilder creates a new Builder which stores data objects for the specified +// tenant in a bucket. +// +// NewBuilder returns an error if BuilderConfig is invalid. +func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Builder, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + labelCache, err := lru.New[string, labels.Labels](5000) + if err != nil { + return nil, fmt.Errorf("failed to create LRU cache: %w", err) + } + + var ( + metrics = newMetrics() + + flushBuffer = bytes.NewBuffer(make([]byte, 0, int(cfg.TargetObjectSize))) + encoder = encoding.NewEncoder(flushBuffer) + ) + metrics.ObserveConfig(cfg) + + return &Builder{ + cfg: cfg, + metrics: metrics, + bucket: bucket, + tenantID: tenantID, + + labelCache: labelCache, + + streams: streams.New(metrics.streams, int(cfg.TargetPageSize)), + logs: logs.New(metrics.logs, logs.Options{ + PageSizeHint: int(cfg.TargetPageSize), + BufferSize: int(cfg.BufferSize), + SectionSize: int(cfg.TargetSectionSize), + }), + + flushBuffer: flushBuffer, + encoder: encoder, + }, nil +} + +// Append buffers a stream to be written to a data object. Append returns an +// error if the stream labels cannot be parsed or [ErrBufferFull] if the +// builder is full. +// +// Once a Builder is full, call [Builder.Flush] to flush the buffered data, +// then call Append again with the same entry. +func (b *Builder) Append(stream logproto.Stream) error { + // Don't allow appending to a builder that has data to be flushed. + if b.state == builderStateFlush { + return ErrBufferFull + } + + ls, err := b.parseLabels(stream.Labels) + if err != nil { + return err + } + + // Check whether the buffer is full before a stream can be appended; this is + // tends to overestimate, but we may still go over our target size. + // + // Since this check only happens after the first call to Append, + // b.currentSizeEstimate will always be updated to reflect the size following + // the previous append. + if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) { + return ErrBufferFull + } + + timer := prometheus.NewTimer(b.metrics.appendTime) + defer timer.ObserveDuration() + + for _, entry := range stream.Entries { + streamID := b.streams.Record(ls, entry.Timestamp) + + b.logs.Append(logs.Record{ + StreamID: streamID, + Timestamp: entry.Timestamp, + Metadata: entry.StructuredMetadata, + Line: entry.Line, + }) + } + + b.currentSizeEstimate = b.estimatedSize() + b.state = builderStateDirty + return nil +} + +func (b *Builder) parseLabels(labelString string) (labels.Labels, error) { + labels, ok := b.labelCache.Get(labelString) + if ok { + return labels, nil + } + + labels, err := syntax.ParseLabels(labelString) + if err != nil { + return nil, fmt.Errorf("failed to parse labels: %w", err) + } + b.labelCache.Add(labelString, labels) + return labels, nil +} + +func (b *Builder) estimatedSize() int { + var size int + size += b.streams.EstimatedSize() + size += b.logs.EstimatedSize() + b.metrics.sizeEstimate.Set(float64(size)) + return size +} + +// labelsEstimate estimates the size of a set of labels in bytes. +func labelsEstimate(ls labels.Labels) int { + var ( + keysSize int + valuesSize int + ) + + for _, l := range ls { + keysSize += len(l.Name) + valuesSize += len(l.Value) + } + + // Keys are stored as columns directly, while values get compressed. We'll + // underestimate a 2x compression ratio. + return keysSize + valuesSize/2 +} + +// streamSizeEstimate estimates the size of a stream in bytes. +func streamSizeEstimate(stream logproto.Stream) int { + var size int + for _, entry := range stream.Entries { + // We only check the size of the line and metadata. Timestamps and IDs + // encode so well that they're unlikely to make a singificant impact on our + // size estimate. + size += len(entry.Line) / 2 // Line with 2x compression ratio + for _, md := range entry.StructuredMetadata { + size += len(md.Name) + len(md.Value)/2 + } + } + return size +} + +// Flush flushes all buffered data to object storage. Calling Flush can result +// in a no-op if there is no buffered data to flush. +// +// If Flush builds an object but fails to upload it to object storage, the +// built object is cached and can be retried. [Builder.Reset] can be called to +// discard any pending data and allow new data to be appended. +func (b *Builder) Flush(ctx context.Context) error { + switch b.state { + case builderStateEmpty: + return nil // Nothing to flush + case builderStateDirty: + if err := b.buildObject(); err != nil { + return fmt.Errorf("building object: %w", err) + } + b.state = builderStateFlush + } + + timer := prometheus.NewTimer(b.metrics.flushTime) + defer timer.ObserveDuration() + + sum := sha256.Sum224(b.flushBuffer.Bytes()) + sumStr := hex.EncodeToString(sum[:]) + + objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:]) + if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(b.flushBuffer.Bytes())); err != nil { + return err + } + + b.Reset() + return nil +} + +func (b *Builder) buildObject() error { + timer := prometheus.NewTimer(b.metrics.buildTime) + defer timer.ObserveDuration() + + // We reset after a successful flush, but we also reset the buffer before + // building for safety. + b.flushBuffer.Reset() + + if err := b.streams.EncodeTo(b.encoder); err != nil { + return fmt.Errorf("encoding streams: %w", err) + } else if err := b.logs.EncodeTo(b.encoder); err != nil { + return fmt.Errorf("encoding logs: %w", err) + } else if err := b.encoder.Flush(); err != nil { + return fmt.Errorf("encoding object: %w", err) + } + + b.metrics.builtSize.Observe(float64(b.flushBuffer.Len())) + + // We pass context.Background() below to avoid allowing building an object to + // time out; timing out on build would discard anything we built and would + // cause data loss. + dec := encoding.ReaderAtDecoder(bytes.NewReader(b.flushBuffer.Bytes()), int64(b.flushBuffer.Len())) + return b.metrics.encoding.Observe(context.Background(), dec) +} + +// Reset discards pending data and resets the builder to an empty state. +func (b *Builder) Reset() { + b.logs.Reset() + b.streams.Reset() + + b.state = builderStateEmpty + b.flushBuffer.Reset() + b.metrics.sizeEstimate.Set(0) +} + +// RegisterMetrics registers metrics about builder to report to reg. All +// metrics will have a tenant label set to the tenant ID of the Builder. +// +// If multiple Builders for the same tenant are running in the same process, +// reg must contain additional labels to differentiate between them. +func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error { + reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg) + return b.metrics.Register(reg) +} + +// UnregisterMetrics unregisters metrics about builder from reg. +func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) { + reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg) + b.metrics.Unregister(reg) +} diff --git a/pkg/dataobj/dataobj_test.go b/pkg/dataobj/builder_test.go similarity index 62% rename from pkg/dataobj/dataobj_test.go rename to pkg/dataobj/builder_test.go index 6c75d722ae5c8..365f6a5d6196f 100644 --- a/pkg/dataobj/dataobj_test.go +++ b/pkg/dataobj/builder_test.go @@ -1,10 +1,9 @@ package dataobj import ( - "cmp" "context" "errors" - "slices" + "fmt" "strings" "testing" "time" @@ -16,7 +15,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql/syntax" ) var testBuilderConfig = BuilderConfig{ @@ -29,7 +27,7 @@ var testBuilderConfig = BuilderConfig{ BufferSize: 2048 * 8, } -func Test(t *testing.T) { +func TestBuilder(t *testing.T) { bucket := objstore.NewInMemBucket() streams := []logproto.Stream{ @@ -87,21 +85,21 @@ func Test(t *testing.T) { }) t.Run("Read", func(t *testing.T) { - reader := newReader(bucket) - - objects, err := result.Collect(reader.Objects(context.Background(), "fake")) + objects, err := result.Collect(listObjects(context.Background(), bucket, "fake")) require.NoError(t, err) require.Len(t, objects, 1) - actual, err := result.Collect(reader.Streams(context.Background(), objects[0])) + obj := FromBucket(bucket, objects[0]) + md, err := obj.Metadata(context.Background()) require.NoError(t, err) - require.Equal(t, sortStreams(t, streams), actual) + require.Equal(t, 1, md.StreamsSections) + require.Equal(t, 1, md.LogsSections) }) } -// Test_Builder_Append ensures that appending to the buffer eventually reports +// TestBuilder_Append ensures that appending to the buffer eventually reports // that the buffer is full. -func Test_Builder_Append(t *testing.T) { +func TestBuilder_Append(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() @@ -127,43 +125,24 @@ func Test_Builder_Append(t *testing.T) { } } -// sortStreams returns a new slice of streams where entries in individual -// streams are sorted by timestamp and structured metadata are sorted by key. -// The order of streams is preserved. -func sortStreams(t *testing.T, streams []logproto.Stream) []logproto.Stream { - t.Helper() - - res := make([]logproto.Stream, len(streams)) - for i, in := range streams { - labels, err := syntax.ParseLabels(in.Labels) - require.NoError(t, err) - - res[i] = logproto.Stream{ - Labels: labels.String(), - Entries: slices.Clone(in.Entries), - Hash: labels.Hash(), - } - - for j, ent := range res[i].Entries { - res[i].Entries[j].StructuredMetadata = slices.Clone(ent.StructuredMetadata) - slices.SortFunc(res[i].Entries[j].StructuredMetadata, func(i, j push.LabelAdapter) int { - return cmp.Compare(i.Name, j.Name) - }) - } +func listObjects(ctx context.Context, bucket objstore.Bucket, tenant string) result.Seq[string] { + tenantPath := fmt.Sprintf("tenant-%s/objects/", tenant) - slices.SortFunc(res[i].Entries, func(i, j push.Entry) int { - switch { - case i.Timestamp.Before(j.Timestamp): - return -1 + return result.Iter(func(yield func(string) bool) error { + errIterationStopped := errors.New("iteration stopped") - case i.Timestamp.After(j.Timestamp): - return 1 - - default: - return 0 + err := bucket.Iter(ctx, tenantPath, func(name string) error { + if !yield(name) { + return errIterationStopped } - }) - } - - return res + return nil + }, objstore.WithRecursiveIter()) + + switch { + case errors.Is(err, errIterationStopped): + return nil + default: + return err + } + }) } diff --git a/pkg/dataobj/dataobj.go b/pkg/dataobj/dataobj.go index 62ce8c0a16c8f..23cb094f0db5a 100644 --- a/pkg/dataobj/dataobj.go +++ b/pkg/dataobj/dataobj.go @@ -2,363 +2,54 @@ package dataobj import ( - "bytes" "context" - "crypto/sha256" - "encoding/hex" - "errors" - "flag" "fmt" + "io" - "github.com/grafana/dskit/flagext" - lru "github.com/hashicorp/golang-lru/v2" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" - "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs" - "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams" - "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" ) -// ErrBufferFull is returned by [Builder.Append] when the buffer is full and -// needs to flush; call [Builder.Flush] to flush it. -var ErrBufferFull = errors.New("buffer full") - -// BuilderConfig configures a data object [Builder]. -type BuilderConfig struct { - // SHAPrefixSize sets the number of bytes of the SHA filename to use as a - // folder path. - SHAPrefixSize int `yaml:"sha_prefix_size"` - - // TargetPageSize configures a target size for encoded pages within the data - // object. TargetPageSize accounts for encoding, but not for compression. - TargetPageSize flagext.Bytes `yaml:"target_page_size"` - - // TODO(rfratto): We need an additional parameter for TargetMetadataSize, as - // metadata payloads can't be split and must be downloaded in a single - // request. - // - // At the moment, we don't have a good mechanism for implementing a metadata - // size limit (we need to support some form of section splitting or column - // combinations), so the option is omitted for now. - - // TargetObjectSize configures a target size for data objects. - TargetObjectSize flagext.Bytes `yaml:"target_object_size"` - - // TargetSectionSize configures the maximum size of data in a section. Sections - // which support this parameter will place overflow data into new sections of - // the same type. - TargetSectionSize flagext.Bytes `yaml:"target_section_size"` - - // BufferSize configures the size of the buffer used to accumulate - // uncompressed logs in memory prior to sorting. - BufferSize flagext.Bytes `yaml:"buffer_size"` +// An Object is a representation of a data object. +type Object struct { + dec encoding.Decoder } -// RegisterFlagsWithPrefix registers flags with the given prefix. -func (cfg *BuilderConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - _ = cfg.TargetPageSize.Set("2MB") - _ = cfg.TargetObjectSize.Set("1GB") - _ = cfg.BufferSize.Set("16MB") // Page Size * 8 - _ = cfg.TargetSectionSize.Set("128MB") // Target Object Size / 8 - - f.IntVar(&cfg.SHAPrefixSize, prefix+"sha-prefix-size", 2, "The size of the SHA prefix to use for the data object builder.") - f.Var(&cfg.TargetPageSize, prefix+"target-page-size", "The size of the target page to use for the data object builder.") - f.Var(&cfg.TargetObjectSize, prefix+"target-object-size", "The size of the target object to use for the data object builder.") - f.Var(&cfg.TargetSectionSize, prefix+"target-section-size", "Configures a maximum size for sections, for sections that support it.") - f.Var(&cfg.BufferSize, prefix+"buffer-size", "The size of the buffer to use for sorting logs.") +// FromBucket opens an Object from the given storage bucket and path. +func FromBucket(bucket objstore.Bucket, path string) *Object { + return &Object{dec: encoding.BucketDecoder(bucket, path)} } -// Validate validates the BuilderConfig. -func (cfg *BuilderConfig) Validate() error { - var errs []error - - if cfg.SHAPrefixSize <= 0 { - errs = append(errs, errors.New("SHAPrefixSize must be greater than 0")) - } - - if cfg.TargetPageSize <= 0 { - errs = append(errs, errors.New("TargetPageSize must be greater than 0")) - } else if cfg.TargetPageSize >= cfg.TargetObjectSize { - errs = append(errs, errors.New("TargetPageSize must be less than TargetObjectSize")) - } - - if cfg.TargetObjectSize <= 0 { - errs = append(errs, errors.New("TargetObjectSize must be greater than 0")) - } - - if cfg.BufferSize <= 0 { - errs = append(errs, errors.New("BufferSize must be greater than 0")) - } - - if cfg.TargetSectionSize <= 0 || cfg.TargetSectionSize > cfg.TargetObjectSize { - errs = append(errs, errors.New("SectionSize must be greater than 0 and less than or equal to TargetObjectSize")) - } - - return errors.Join(errs...) +// FromReadSeeker opens an Object from the given ReaderAt. The size argument +// specifies the size of the data object in bytes. +func FromReaderAt(r io.ReaderAt, size int64) *Object { + return &Object{dec: encoding.ReaderAtDecoder(r, size)} } -// A Builder builds data objects from a set of incoming log data. Log data is -// appended to a builder by calling [Builder.Append]. Buffered log data is -// flushed manually by calling [Builder.Flush]. -// -// Methods on Builder are not goroutine-safe; callers are responsible for -// synchronizing calls. -type Builder struct { - cfg BuilderConfig - metrics *metrics - bucket objstore.Bucket - tenantID string - - labelCache *lru.Cache[string, labels.Labels] - - currentSizeEstimate int - state builderState - - streams *streams.Streams - logs *logs.Logs - - flushBuffer *bytes.Buffer - encoder *encoding.Encoder +// Metadata holds high-level metadata about an [Object]. +type Metadata struct { + StreamsSections int // Number of streams sections in the Object. + LogsSections int // Number of logs sections in the Object. } -type builderState int - -const ( - // builderStateReady indicates the builder is empty and ready to accept new data. - builderStateEmpty builderState = iota - - // builderStateDirty indicates the builder has been modified since the last flush. - builderStateDirty - - // builderStateFlushing indicates the builder has data to flush. - builderStateFlush -) - -// NewBuilder creates a new Builder which stores data objects for the specified -// tenant in a bucket. -// -// NewBuilder returns an error if BuilderConfig is invalid. -func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) (*Builder, error) { - if err := cfg.Validate(); err != nil { - return nil, err - } - - labelCache, err := lru.New[string, labels.Labels](5000) +// Metadata returns the metadata of the Object. Metadata returns an error if +// the object cannot be read. +func (o *Object) Metadata(ctx context.Context) (Metadata, error) { + si, err := o.dec.Sections(ctx) if err != nil { - return nil, fmt.Errorf("failed to create LRU cache: %w", err) - } - - var ( - metrics = newMetrics() - - flushBuffer = bytes.NewBuffer(make([]byte, 0, int(cfg.TargetObjectSize))) - encoder = encoding.NewEncoder(flushBuffer) - ) - metrics.ObserveConfig(cfg) - - return &Builder{ - cfg: cfg, - metrics: metrics, - bucket: bucket, - tenantID: tenantID, - - labelCache: labelCache, - - streams: streams.New(metrics.streams, int(cfg.TargetPageSize)), - logs: logs.New(metrics.logs, logs.Options{ - PageSizeHint: int(cfg.TargetPageSize), - BufferSize: int(cfg.BufferSize), - SectionSize: int(cfg.TargetSectionSize), - }), - - flushBuffer: flushBuffer, - encoder: encoder, - }, nil -} - -// Append buffers a stream to be written to a data object. Append returns an -// error if the stream labels cannot be parsed or [ErrBufferFull] if the -// builder is full. -// -// Once a Builder is full, call [Builder.Flush] to flush the buffered data, -// then call Append again with the same entry. -func (b *Builder) Append(stream logproto.Stream) error { - // Don't allow appending to a builder that has data to be flushed. - if b.state == builderStateFlush { - return ErrBufferFull - } - - ls, err := b.parseLabels(stream.Labels) - if err != nil { - return err - } - - // Check whether the buffer is full before a stream can be appended; this is - // tends to overestimate, but we may still go over our target size. - // - // Since this check only happens after the first call to Append, - // b.currentSizeEstimate will always be updated to reflect the size following - // the previous append. - if b.state != builderStateEmpty && b.currentSizeEstimate+labelsEstimate(ls)+streamSizeEstimate(stream) > int(b.cfg.TargetObjectSize) { - return ErrBufferFull + return Metadata{}, fmt.Errorf("reading sections: %w", err) } - timer := prometheus.NewTimer(b.metrics.appendTime) - defer timer.ObserveDuration() - - for _, entry := range stream.Entries { - streamID := b.streams.Record(ls, entry.Timestamp) - - b.logs.Append(logs.Record{ - StreamID: streamID, - Timestamp: entry.Timestamp, - Metadata: entry.StructuredMetadata, - Line: entry.Line, - }) - } - - b.currentSizeEstimate = b.estimatedSize() - b.state = builderStateDirty - return nil -} - -func (b *Builder) parseLabels(labelString string) (labels.Labels, error) { - labels, ok := b.labelCache.Get(labelString) - if ok { - return labels, nil - } - - labels, err := syntax.ParseLabels(labelString) - if err != nil { - return nil, fmt.Errorf("failed to parse labels: %w", err) - } - b.labelCache.Add(labelString, labels) - return labels, nil -} - -func (b *Builder) estimatedSize() int { - var size int - size += b.streams.EstimatedSize() - size += b.logs.EstimatedSize() - b.metrics.sizeEstimate.Set(float64(size)) - return size -} - -// labelsEstimate estimates the size of a set of labels in bytes. -func labelsEstimate(ls labels.Labels) int { - var ( - keysSize int - valuesSize int - ) - - for _, l := range ls { - keysSize += len(l.Name) - valuesSize += len(l.Value) - } - - // Keys are stored as columns directly, while values get compressed. We'll - // underestimate a 2x compression ratio. - return keysSize + valuesSize/2 -} - -// streamSizeEstimate estimates the size of a stream in bytes. -func streamSizeEstimate(stream logproto.Stream) int { - var size int - for _, entry := range stream.Entries { - // We only check the size of the line and metadata. Timestamps and IDs - // encode so well that they're unlikely to make a singificant impact on our - // size estimate. - size += len(entry.Line) / 2 // Line with 2x compression ratio - for _, md := range entry.StructuredMetadata { - size += len(md.Name) + len(md.Value)/2 - } - } - return size -} - -// Flush flushes all buffered data to object storage. Calling Flush can result -// in a no-op if there is no buffered data to flush. -// -// If Flush builds an object but fails to upload it to object storage, the -// built object is cached and can be retried. [Builder.Reset] can be called to -// discard any pending data and allow new data to be appended. -func (b *Builder) Flush(ctx context.Context) error { - switch b.state { - case builderStateEmpty: - return nil // Nothing to flush - case builderStateDirty: - if err := b.buildObject(); err != nil { - return fmt.Errorf("building object: %w", err) + var md Metadata + for _, s := range si { + switch s.Type { + case filemd.SECTION_TYPE_STREAMS: + md.StreamsSections++ + case filemd.SECTION_TYPE_LOGS: + md.LogsSections++ } - b.state = builderStateFlush - } - - timer := prometheus.NewTimer(b.metrics.flushTime) - defer timer.ObserveDuration() - - sum := sha256.Sum224(b.flushBuffer.Bytes()) - sumStr := hex.EncodeToString(sum[:]) - - objectPath := fmt.Sprintf("tenant-%s/objects/%s/%s", b.tenantID, sumStr[:b.cfg.SHAPrefixSize], sumStr[b.cfg.SHAPrefixSize:]) - if err := b.bucket.Upload(ctx, objectPath, bytes.NewReader(b.flushBuffer.Bytes())); err != nil { - return err } - - b.Reset() - return nil -} - -func (b *Builder) buildObject() error { - timer := prometheus.NewTimer(b.metrics.buildTime) - defer timer.ObserveDuration() - - // We reset after a successful flush, but we also reset the buffer before - // building for safety. - b.flushBuffer.Reset() - - if err := b.streams.EncodeTo(b.encoder); err != nil { - return fmt.Errorf("encoding streams: %w", err) - } else if err := b.logs.EncodeTo(b.encoder); err != nil { - return fmt.Errorf("encoding logs: %w", err) - } else if err := b.encoder.Flush(); err != nil { - return fmt.Errorf("encoding object: %w", err) - } - - b.metrics.builtSize.Observe(float64(b.flushBuffer.Len())) - - // We pass context.Background() below to avoid allowing building an object to - // time out; timing out on build would discard anything we built and would - // cause data loss. - dec := encoding.ReaderAtDecoder(bytes.NewReader(b.flushBuffer.Bytes()), int64(b.flushBuffer.Len())) - return b.metrics.encoding.Observe(context.Background(), dec) -} - -// Reset discards pending data and resets the builder to an empty state. -func (b *Builder) Reset() { - b.logs.Reset() - b.streams.Reset() - - b.state = builderStateEmpty - b.flushBuffer.Reset() - b.metrics.sizeEstimate.Set(0) -} - -// RegisterMetrics registers metrics about builder to report to reg. All -// metrics will have a tenant label set to the tenant ID of the Builder. -// -// If multiple Builders for the same tenant are running in the same process, -// reg must contain additional labels to differentiate between them. -func (b *Builder) RegisterMetrics(reg prometheus.Registerer) error { - reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg) - return b.metrics.Register(reg) -} - -// UnregisterMetrics unregisters metrics about builder from reg. -func (b *Builder) UnregisterMetrics(reg prometheus.Registerer) { - reg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": b.tenantID}, reg) - b.metrics.Unregister(reg) + return md, nil } diff --git a/pkg/dataobj/internal/sections/logs/iter.go b/pkg/dataobj/internal/sections/logs/iter.go index 9d2db95e3df5b..361e0bc526df3 100644 --- a/pkg/dataobj/internal/sections/logs/iter.go +++ b/pkg/dataobj/internal/sections/logs/iter.go @@ -33,7 +33,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] { continue } - for result := range iterSection(ctx, logsDec, section) { + for result := range IterSection(ctx, logsDec, section) { if result.Err() != nil || !yield(result.MustValue()) { return result.Err() } @@ -44,7 +44,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Record] { }) } -func iterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.SectionInfo) result.Seq[Record] { +func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.SectionInfo) result.Seq[Record] { return result.Iter(func(yield func(Record) bool) error { // We need to pull the columns twice: once from the dataset implementation // and once for the metadata to retrieve column type. diff --git a/pkg/dataobj/internal/sections/streams/iter.go b/pkg/dataobj/internal/sections/streams/iter.go index 4443aa10eb646..b75da46262c80 100644 --- a/pkg/dataobj/internal/sections/streams/iter.go +++ b/pkg/dataobj/internal/sections/streams/iter.go @@ -31,7 +31,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] { continue } - for result := range iterSection(ctx, streamsDec, section) { + for result := range IterSection(ctx, streamsDec, section) { if result.Err() != nil || !yield(result.MustValue()) { return result.Err() } @@ -42,7 +42,7 @@ func Iter(ctx context.Context, dec encoding.Decoder) result.Seq[Stream] { }) } -func iterSection(ctx context.Context, dec encoding.StreamsDecoder, section *filemd.SectionInfo) result.Seq[Stream] { +func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *filemd.SectionInfo) result.Seq[Stream] { return result.Iter(func(yield func(Stream) bool) error { // We need to pull the columns twice: once from the dataset implementation // and once for the metadata to retrieve column type. diff --git a/pkg/dataobj/logs_reader.go b/pkg/dataobj/logs_reader.go new file mode 100644 index 0000000000000..1261d90acfd58 --- /dev/null +++ b/pkg/dataobj/logs_reader.go @@ -0,0 +1,294 @@ +package dataobj + +import ( + "context" + "fmt" + "io" + "iter" + "sort" + "time" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/push" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" + "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs" +) + +// Predicates for reading logs. +type ( + // MetadataMatcher is a predicate for matching metadata in a logs section. + // MetadataMatcher predicates assert that a metadata entry named Name exists + // and its value is set to Value. + // + // For equality matches, MetadataMatcher should always be used; + // MetadataMatchers can translate into more efficient filter operations than + // a [MetadataFilter] can. + MetadataMatcher struct{ Name, Value string } + + // MetadataFilter is a predicate for matching metadata in a logs section. + // MetadataFilter predicates return a true value when the combination of the + // provided metadata entry name and value should be included in the result. + // + // MetadataFilter predicates should be only used for more complex filtering; + // for equality matches, [MetadataMatcher]s are more efficient. + MetadataFilter func(name, value string) bool +) + +// A Record is an individual log record in a data object. +type Record struct { + StreamID int64 // StreamID associated with the log record. + Timestamp time.Time // Timestamp of the log record. + Metadata labels.Labels // Set of metadata associated with the log record. + Line string // Line of the log record. +} + +// LogsReader reads the set of logs from an [Object]. +type LogsReader struct { + obj *Object + idx int + + matchers map[string]string + filters map[string]MetadataFilter + matchIDs map[int64]struct{} + + next func() (result.Result[logs.Record], bool) + stop func() +} + +// NewLogsReader creates a new LogsReader that reads from the logs section of +// the given object. +func NewLogsReader(obj *Object, sectionIndex int) *LogsReader { + var lr LogsReader + lr.Reset(obj, sectionIndex) + return &lr +} + +// MatchStreams provides a sequence of stream IDs for the logs reader to match. +// [LogsReader.Read] will only return logs for the provided stream IDs. +// +// MatchStreams may be called multiple times to match multiple sets of streams. +// +// MatchStreams may only be called before reading begins or after a call to +// [LogsReader.Reset]. +func (r *LogsReader) MatchStreams(ids iter.Seq[int64]) error { + if r.next != nil { + return fmt.Errorf("cannot change matched streams after reading has started") + } + + if r.matchIDs == nil { + r.matchIDs = make(map[int64]struct{}) + } + for id := range ids { + r.matchIDs[id] = struct{}{} + } + return nil +} + +// AddMetadataMatcher adds a metadata matcher to the LogsReader. +// [LogsReader.Read] will only return logs for which the metadata matcher +// predicate passes. +// +// AddMetadataMatcher may only be called before reading begins or after a call +// to [LogsReader.Reset]. +func (r *LogsReader) AddMetadataMatcher(m MetadataMatcher) error { + if r.next != nil { + return fmt.Errorf("cannot add metadata matcher after reading has started") + } + + if r.matchers == nil { + r.matchers = make(map[string]string) + } + r.matchers[m.Name] = m.Value + return nil +} + +// AddMetadataFilter adds a metadata filter to the LogsReader. +// [LogsReader.Read] will only return records for which the metadata filter +// predicate passes. The filter f will be called with the provided key to allow +// the same function to be reused for multiple keys. +// +// AddMetadataFilter may only be called before reading begins or after a call +// to [LogsReader.Reset]. +func (r *LogsReader) AddMetadataFilter(key string, f MetadataFilter) error { + if r.next != nil { + return fmt.Errorf("cannot add metadata filter after reading has started") + } + + if r.filters == nil { + r.filters = make(map[string]MetadataFilter) + } + r.filters[key] = f + return nil +} + +// Read reads up to the next len(s) records from the reader and stores them +// into s. It returns the number of records read and any error encountered. At +// the end of the logs section, Read returns 0, io.EOF. +func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) { + // TODO(rfratto): The implementation below is the initial, naive approach. It + // lacks a few features that will be needed at scale: + // + // * Read columns/pages in batches of len(s), rather than one row at a time, + // + // * Add page-level filtering based on min/max page values to quickly filter + // out batches of rows without needing to download or decode them. + // + // * Download pages in batches, rather than one at a time. + // + // * Only download/decode non-predicate columns following finding rows that + // match all predicate columns. + // + // * Reuse as much memory as possible from a combination of s and the state + // of LogsReader. + // + // These details can change internally without changing the API exposed by + // LogsReader, which is designed to permit efficient use in the future. + + if r.obj == nil { + return 0, io.EOF + } else if r.idx < 0 { + return 0, fmt.Errorf("invalid section index %d", r.idx) + } + + if r.next == nil { + err := r.initIter(ctx) + if err != nil { + return 0, err + } + } + + for i := range s { + res, ok := r.nextMatching() + if !ok { + return i, io.EOF + } + + record, err := res.Value() + if err != nil { + return i, fmt.Errorf("reading record: %w", err) + } + + s[i] = Record{ + StreamID: record.StreamID, + Timestamp: record.Timestamp, + Metadata: convertMetadata(record.Metadata), + Line: record.Line, + } + } + + return len(s), nil +} + +func (r *LogsReader) initIter(ctx context.Context) error { + sec, err := r.findSection(ctx) + if err != nil { + return fmt.Errorf("finding section: %w", err) + } + + if r.stop != nil { + r.stop() + } + + seq := logs.IterSection(ctx, r.obj.dec.LogsDecoder(), sec) + r.next, r.stop = result.Pull(seq) + return nil +} + +func (r *LogsReader) findSection(ctx context.Context) (*filemd.SectionInfo, error) { + si, err := r.obj.dec.Sections(ctx) + if err != nil { + return nil, fmt.Errorf("reading sections: %w", err) + } + + var n int + + for _, s := range si { + if s.Type == filemd.SECTION_TYPE_LOGS { + if n == r.idx { + return s, nil + } + n++ + } + } + + return nil, fmt.Errorf("section index %d not found", r.idx) +} + +func (r *LogsReader) nextMatching() (result.Result[logs.Record], bool) { + if r.next == nil { + return result.Result[logs.Record]{}, false + } + +NextRow: + res, ok := r.next() + if !ok { + return res, ok + } + + record, err := res.Value() + if err != nil { + return res, true + } + + if r.matchIDs != nil { + if _, ok := r.matchIDs[record.StreamID]; !ok { + goto NextRow + } + } + + for key, value := range r.matchers { + if getMetadata(record.Metadata, key) != value { + goto NextRow + } + } + + for key, filter := range r.filters { + if !filter(key, getMetadata(record.Metadata, key)) { + goto NextRow + } + } + + return res, true +} + +func getMetadata(md push.LabelsAdapter, key string) string { + for _, l := range md { + if l.Name == key { + return l.Value + } + } + + return "" +} + +func convertMetadata(md push.LabelsAdapter) labels.Labels { + l := make(labels.Labels, 0, len(md)) + for _, label := range md { + l = append(l, labels.Label{Name: label.Name, Value: label.Value}) + } + sort.Sort(l) + return l +} + +// Reset resets the LogsReader with a new object and section index to read +// from. Reset allows reusing a LogsReader without allocating a new one. +// +// Reset may be called with a nil object and a negative section index to clear +// the LogsReader without needing a new object. +func (r *LogsReader) Reset(obj *Object, sectionIndex int) { + if r.stop != nil { + r.stop() + } + + r.obj = obj + r.idx = sectionIndex + r.next = nil + r.stop = nil + + clear(r.matchers) + clear(r.filters) + clear(r.matchIDs) +} diff --git a/pkg/dataobj/logs_reader_test.go b/pkg/dataobj/logs_reader_test.go new file mode 100644 index 0000000000000..140049df24d61 --- /dev/null +++ b/pkg/dataobj/logs_reader_test.go @@ -0,0 +1,183 @@ +package dataobj_test + +import ( + "bytes" + "context" + "errors" + "io" + "slices" + "strings" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/push" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" + "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs" +) + +var recordsTestdata = []logs.Record{ + {StreamID: 1, Timestamp: unixTime(10), Metadata: nil, Line: "hello"}, + {StreamID: 1, Timestamp: unixTime(15), Metadata: metadata("trace_id", "123"), Line: "world"}, + {StreamID: 2, Timestamp: unixTime(5), Metadata: nil, Line: "hello again"}, + {StreamID: 2, Timestamp: unixTime(20), Metadata: metadata("user", "12"), Line: "world again"}, + {StreamID: 3, Timestamp: unixTime(25), Metadata: metadata("user", "14"), Line: "hello one more time"}, + {StreamID: 3, Timestamp: unixTime(30), Metadata: metadata("trace_id", "123"), Line: "world one more time"}, +} + +func metadata(kvps ...string) push.LabelsAdapter { + if len(kvps)%2 != 0 { + panic("metadata: odd number of key-value pairs") + } + + m := make(push.LabelsAdapter, len(kvps)/2) + for i := 0; i < len(kvps); i += 2 { + m = append(m, push.LabelAdapter{Name: kvps[i], Value: kvps[i+1]}) + } + return m +} + +func TestLogsReader(t *testing.T) { + expect := []dataobj.Record{ + {1, unixTime(10), labels.FromStrings(), "hello"}, + {1, unixTime(15), labels.FromStrings("trace_id", "123"), "world"}, + {2, unixTime(5), labels.FromStrings(), "hello again"}, + {2, unixTime(20), labels.FromStrings("user", "12"), "world again"}, + {3, unixTime(25), labels.FromStrings("user", "14"), "hello one more time"}, + {3, unixTime(30), labels.FromStrings("trace_id", "123"), "world one more time"}, + } + + // Build with many pages but one section. + obj := buildLogsObject(t, logs.Options{ + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + }) + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.LogsSections) + + r := dataobj.NewLogsReader(obj, 0) + actual, err := readAllRecords(context.Background(), r) + require.NoError(t, err) + require.Equal(t, expect, actual) +} + +func TestLogsReader_MatchStreams(t *testing.T) { + expect := []dataobj.Record{ + {1, unixTime(10), labels.FromStrings(), "hello"}, + {1, unixTime(15), labels.FromStrings("trace_id", "123"), "world"}, + {3, unixTime(25), labels.FromStrings("user", "14"), "hello one more time"}, + {3, unixTime(30), labels.FromStrings("trace_id", "123"), "world one more time"}, + } + + // Build with many pages but one section. + obj := buildLogsObject(t, logs.Options{ + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + }) + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.LogsSections) + + r := dataobj.NewLogsReader(obj, 0) + require.NoError(t, r.MatchStreams(slices.Values([]int64{1, 3}))) + + actual, err := readAllRecords(context.Background(), r) + require.NoError(t, err) + require.Equal(t, expect, actual) +} + +func TestLogsReader_AddMetadataMatcher(t *testing.T) { + expect := []dataobj.Record{ + {1, unixTime(15), labels.FromStrings("trace_id", "123"), "world"}, + {3, unixTime(30), labels.FromStrings("trace_id", "123"), "world one more time"}, + } + + // Build with many pages but one section. + obj := buildLogsObject(t, logs.Options{ + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + }) + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.LogsSections) + + r := dataobj.NewLogsReader(obj, 0) + require.NoError(t, r.AddMetadataMatcher(dataobj.MetadataMatcher{"trace_id", "123"})) + + actual, err := readAllRecords(context.Background(), r) + require.NoError(t, err) + require.Equal(t, expect, actual) +} + +func TestLogsReader_AddMetadataFilter(t *testing.T) { + expect := []dataobj.Record{ + {2, unixTime(20), labels.FromStrings("user", "12"), "world again"}, + {3, unixTime(25), labels.FromStrings("user", "14"), "hello one more time"}, + } + + // Build with many pages but one section. + obj := buildLogsObject(t, logs.Options{ + PageSizeHint: 1, + BufferSize: 1, + SectionSize: 1024, + }) + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.LogsSections) + + r := dataobj.NewLogsReader(obj, 0) + err = r.AddMetadataFilter("user", func(name, value string) bool { + require.Equal(t, "user", name) + return strings.HasPrefix(value, "1") + }) + require.NoError(t, err) + + actual, err := readAllRecords(context.Background(), r) + require.NoError(t, err) + require.Equal(t, expect, actual) +} + +func buildLogsObject(t *testing.T, opts logs.Options) *dataobj.Object { + t.Helper() + + s := logs.New(nil, opts) + for _, rec := range recordsTestdata { + s.Append(rec) + } + + var buf bytes.Buffer + + enc := encoding.NewEncoder(&buf) + require.NoError(t, s.EncodeTo(enc)) + require.NoError(t, enc.Flush()) + + return dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) +} + +func readAllRecords(ctx context.Context, r *dataobj.LogsReader) ([]dataobj.Record, error) { + var ( + res []dataobj.Record + buf = make([]dataobj.Record, 128) + ) + + for { + n, err := r.Read(ctx, buf) + if n > 0 { + res = append(res, buf[:n]...) + } + if errors.Is(err, io.EOF) { + return res, nil + } else if err != nil { + return res, err + } + + buf = buf[:0] + } +} diff --git a/pkg/dataobj/reader.go b/pkg/dataobj/reader.go deleted file mode 100644 index aa5b02c2bc03e..0000000000000 --- a/pkg/dataobj/reader.go +++ /dev/null @@ -1,111 +0,0 @@ -package dataobj - -import ( - "context" - "errors" - "fmt" - - "github.com/thanos-io/objstore" - - "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" - "github.com/grafana/loki/v3/pkg/dataobj/internal/result" - "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs" - "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams" - "github.com/grafana/loki/v3/pkg/logproto" -) - -// reader connects to an object storage bucket and supports basic reading from -// data objects. -// -// reader isn't exposed as a public API because it's insufficient for reading -// at scale; more work is needed to support efficient reads and filtering data. -// At the moment, reader is only used for tests. -type reader struct { - bucket objstore.Bucket -} - -func newReader(bucket objstore.Bucket) *reader { - return &reader{bucket: bucket} -} - -// Objects returns an iterator over all data objects for the provided tenant. -func (r *reader) Objects(ctx context.Context, tenant string) result.Seq[string] { - tenantPath := fmt.Sprintf("tenant-%s/objects/", tenant) - - return result.Iter(func(yield func(string) bool) error { - errIterationStopped := errors.New("iteration stopped") - - err := r.bucket.Iter(ctx, tenantPath, func(name string) error { - if !yield(name) { - return errIterationStopped - } - return nil - }, objstore.WithRecursiveIter()) - - switch { - case errors.Is(err, errIterationStopped): - return nil - default: - return err - } - }) -} - -// Streams returns an iterator over all [logproto.Stream] entries for the -// provided object. Each emitted stream contains all logs for that stream in -// ascending timestamp order. Streams are emitted in in the order they were -// first appended to the data object. -func (r *reader) Streams(ctx context.Context, object string) result.Seq[logproto.Stream] { - return result.Iter(func(yield func(logproto.Stream) bool) error { - dec := encoding.BucketDecoder(r.bucket, object) - - streamRecords, err := result.Collect(streams.Iter(ctx, dec)) - if err != nil { - return fmt.Errorf("reading streams dataset: %w", err) - } - streamRecordLookup := make(map[int64]streams.Stream, len(streamRecords)) - for _, stream := range streamRecords { - streamRecordLookup[stream.ID] = stream - } - - var ( - lastID int64 - batch logproto.Stream - ) - - for result := range logs.Iter(ctx, dec) { - record, err := result.Value() - if err != nil { - return fmt.Errorf("iterating over logs: %w", err) - } - - if lastID != record.StreamID { - if lastID != 0 && !yield(batch) { - return nil - } - - streamRecord := streamRecordLookup[record.StreamID] - - batch = logproto.Stream{ - Labels: streamRecord.Labels.String(), - Hash: streamRecord.Labels.Hash(), - } - - lastID = record.StreamID - } - - batch.Entries = append(batch.Entries, logproto.Entry{ - Timestamp: record.Timestamp, - Line: record.Line, - StructuredMetadata: record.Metadata, - }) - } - if len(batch.Entries) > 0 { - if !yield(batch) { - return nil - } - } - - return nil - }) -} diff --git a/pkg/dataobj/streams_reader.go b/pkg/dataobj/streams_reader.go new file mode 100644 index 0000000000000..12bcd2244bf07 --- /dev/null +++ b/pkg/dataobj/streams_reader.go @@ -0,0 +1,248 @@ +package dataobj + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" + "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams" +) + +// Predicates for reading streams. +type ( + // LabelMatcher is a predicate for matching labels in a streams section. + // LabelMatcher predicates assert that a label named Name exists and its + // value is set to Value. + // + // For equality matches, LabelMatcher should always be used; LabelMatchers + // can translate into more efficient filter operations than a [LabelFilter] + // can. + LabelMatcher struct{ Name, Value string } + + // LabelFilter is a predicate for matching labels in a streams section. + // LabelFilter predicates return a true value when the combination of the + // provided label name and value should be included in the result. + // + // LabelFilter predicates should be only used for more complex filtering; for + // equality matches, [LabelMatcher]s are more efficient. + LabelFilter func(name, value string) bool +) + +// A Stream is an individual stream in a data object. +type Stream struct { + // ID of the stream. Stream IDs are unique across all sections in an object, + // but not across multiple objects. + ID int64 + + // MinTime and MaxTime denote the range of timestamps across all entries in + // the stream. + MinTime, MaxTime time.Time + + // Labels of the stream. + Labels labels.Labels +} + +// StreamsReader reads the set of streams from an [Object]. +type StreamsReader struct { + obj *Object + idx int + + matchers map[string]string + filters map[string]LabelFilter + + next func() (result.Result[streams.Stream], bool) + stop func() +} + +// NewStreamsReader creates a new StreamsReader that reads from the streams +// section of the given object. +func NewStreamsReader(obj *Object, sectionIndex int) *StreamsReader { + var sr StreamsReader + sr.Reset(obj, sectionIndex) + return &sr +} + +// AddLabelMatcher adds a label matcher to the StreamsReader. +// [StreamsReader.Read] will only return streams for which the label matcher +// predicate passes. +// +// AddLabelMatcher may only be called before reading begins or after a call to +// [StreamsReader.Reset]. +func (r *StreamsReader) AddLabelMatcher(m LabelMatcher) error { + if r.next != nil { + return fmt.Errorf("cannot add label matcher after reading has started") + } + + if r.matchers == nil { + r.matchers = make(map[string]string) + } + r.matchers[m.Name] = m.Value + return nil +} + +// AddLabelFilter adds a label filter to the StreamsReader. +// [StreamsReader.Read] will only return streams for which the label filter +// predicate passes. The filter f will be called with the provided key to allow +// the same function to be reused for multiple keys. +// +// AddLabelFilter may only be called before reading begins or after a call to +// [StreamsReader.Reset]. +func (r *StreamsReader) AddLabelFilter(key string, f LabelFilter) error { + if r.next != nil { + return fmt.Errorf("cannot add label filter after reading has started") + } + + if r.filters == nil { + r.filters = make(map[string]LabelFilter) + } + r.filters[key] = f + return nil +} + +// Read reads up to the next len(s) streams from the reader and stores them +// into s. It returns the number of streams read and any error encountered. At +// the end of the stream section, Read returns 0, io.EOF. +func (r *StreamsReader) Read(ctx context.Context, s []Stream) (int, error) { + // TODO(rfratto): The implementation below is the initial, naive approach. It + // lacks a few features that will be needed at scale: + // + // * Read columns/pages in batches of len(s), rather than one row at a time, + // + // * Add page-level filtering based on min/max page values to quickly filter + // out batches of rows without needing to download or decode them. + // + // * Download pages in batches, rather than one at a time. + // + // * Only download/decode non-predicate columns following finding rows that + // match all predicate columns. + // + // * Reuse as much memory as possible from a combination of s and the state + // of StreamsReader. + // + // These details can change internally without changing the API exposed by + // StreamsReader, which is designed to permit efficient use in the future. + + if r.obj == nil { + return 0, io.EOF + } else if r.idx < 0 { + return 0, fmt.Errorf("invalid section index %d", r.idx) + } + + if r.next == nil { + err := r.initIter(ctx) + if err != nil { + return 0, err + } + } + + for i := range s { + res, ok := r.nextMatching() + if !ok { + return i, io.EOF + } + + stream, err := res.Value() + if err != nil { + return i, fmt.Errorf("reading stream: %w", err) + } + + s[i] = Stream{ + ID: stream.ID, + MinTime: stream.MinTimestamp, + MaxTime: stream.MaxTimestamp, + Labels: stream.Labels, + } + } + + return len(s), nil +} + +func (r *StreamsReader) initIter(ctx context.Context) error { + sec, err := r.findSection(ctx) + if err != nil { + return fmt.Errorf("finding section: %w", err) + } + + if r.stop != nil { + r.stop() + } + + seq := streams.IterSection(ctx, r.obj.dec.StreamsDecoder(), sec) + r.next, r.stop = result.Pull(seq) + return nil +} + +func (r *StreamsReader) findSection(ctx context.Context) (*filemd.SectionInfo, error) { + si, err := r.obj.dec.Sections(ctx) + if err != nil { + return nil, fmt.Errorf("reading sections: %w", err) + } + + var n int + + for _, s := range si { + if s.Type == filemd.SECTION_TYPE_STREAMS { + if n == r.idx { + return s, nil + } + n++ + } + } + + return nil, fmt.Errorf("section index %d not found", r.idx) +} + +func (r *StreamsReader) nextMatching() (result.Result[streams.Stream], bool) { + if r.next == nil { + return result.Result[streams.Stream]{}, false + } + +NextRow: + res, ok := r.next() + if !ok { + return res, ok + } + + stream, err := res.Value() + if err != nil { + return res, true + } + + for key, value := range r.matchers { + if stream.Labels.Get(key) != value { + goto NextRow + } + } + + for key, filter := range r.filters { + if !filter(key, stream.Labels.Get(key)) { + goto NextRow + } + } + + return res, true +} + +// Reset resets the StreamsReader with a new object and section index to read +// from. Reset allows reusing a StreamsReader without allocating a new one. +// +// Reset may be called with a nil object and a negative section index to clear +// the StreamsReader without needing a new object. +func (r *StreamsReader) Reset(obj *Object, sectionIndex int) { + if r.stop != nil { + r.stop() + } + + r.obj = obj + r.idx = sectionIndex + r.next = nil + r.stop = nil + + clear(r.matchers) + clear(r.filters) +} diff --git a/pkg/dataobj/streams_reader_test.go b/pkg/dataobj/streams_reader_test.go new file mode 100644 index 0000000000000..59ce1a3b45556 --- /dev/null +++ b/pkg/dataobj/streams_reader_test.go @@ -0,0 +1,131 @@ +package dataobj_test + +import ( + "bytes" + "context" + "errors" + "io" + "strings" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" + "github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams" +) + +var streamsTestdata = []struct { + Labels labels.Labels + Timestamp time.Time +}{ + {labels.FromStrings("cluster", "test", "app", "foo"), unixTime(10)}, + {labels.FromStrings("cluster", "test", "app", "foo"), unixTime(15)}, + {labels.FromStrings("cluster", "test", "app", "bar"), unixTime(5)}, + {labels.FromStrings("cluster", "test", "app", "bar"), unixTime(20)}, + {labels.FromStrings("cluster", "test", "app", "baz"), unixTime(25)}, + {labels.FromStrings("cluster", "test", "app", "baz"), unixTime(30)}, +} + +func TestStreamsReader(t *testing.T) { + expect := []dataobj.Stream{ + {1, unixTime(10), unixTime(15), labels.FromStrings("cluster", "test", "app", "foo")}, + {2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")}, + {3, unixTime(25), unixTime(30), labels.FromStrings("cluster", "test", "app", "baz")}, + } + + obj := buildStreamsObject(t, 1) // Many pages + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.StreamsSections) + + r := dataobj.NewStreamsReader(obj, 0) + actual, err := readAllStreams(context.Background(), r) + require.NoError(t, err) + require.Equal(t, expect, actual) +} + +func TestStreamsReader_AddLabelMatcher(t *testing.T) { + expect := []dataobj.Stream{ + {2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")}, + } + + obj := buildStreamsObject(t, 1) // Many pages + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.StreamsSections) + + r := dataobj.NewStreamsReader(obj, 0) + require.NoError(t, r.AddLabelMatcher(dataobj.LabelMatcher{Name: "app", Value: "bar"})) + + actual, err := readAllStreams(context.Background(), r) + require.NoError(t, err) + require.Equal(t, expect, actual) +} + +func TestStreamsReader_AddLabelFilter(t *testing.T) { + expect := []dataobj.Stream{ + {2, unixTime(5), unixTime(20), labels.FromStrings("cluster", "test", "app", "bar")}, + {3, unixTime(25), unixTime(30), labels.FromStrings("cluster", "test", "app", "baz")}, + } + + obj := buildStreamsObject(t, 1) // Many pages + md, err := obj.Metadata(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, md.StreamsSections) + + r := dataobj.NewStreamsReader(obj, 0) + err = r.AddLabelFilter("app", func(key string, value string) bool { + require.Equal(t, "app", key) + return strings.HasPrefix(value, "b") + }) + require.NoError(t, err) + + actual, err := readAllStreams(context.Background(), r) + require.NoError(t, err) + require.Equal(t, expect, actual) +} + +func unixTime(sec int64) time.Time { + return time.Unix(sec, 0).UTC() +} + +func buildStreamsObject(t *testing.T, pageSize int) *dataobj.Object { + t.Helper() + + s := streams.New(nil, pageSize) + for _, d := range streamsTestdata { + s.Record(d.Labels, d.Timestamp) + } + + var buf bytes.Buffer + + enc := encoding.NewEncoder(&buf) + require.NoError(t, s.EncodeTo(enc)) + require.NoError(t, enc.Flush()) + + return dataobj.FromReaderAt(bytes.NewReader(buf.Bytes()), int64(buf.Len())) +} + +func readAllStreams(ctx context.Context, r *dataobj.StreamsReader) ([]dataobj.Stream, error) { + var ( + res []dataobj.Stream + buf = make([]dataobj.Stream, 128) + ) + + for { + n, err := r.Read(ctx, buf) + if n > 0 { + res = append(res, buf[:n]...) + } + if errors.Is(err, io.EOF) { + return res, nil + } else if err != nil { + return res, err + } + + buf = buf[:0] + } +}