diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index 22462e6b540..f09cfca100a 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -494,11 +494,35 @@ func BenchmarkBlockBuilder(b *testing.B) { store = newStoreWithLogger(ctx, b, logger) cfg = blockbuilderConfig(b, address) client = newKafkaClient(b, cfg.IngestStorageConfig.Kafka) + o = &mockOverrides{ + dc: backend.DedicatedColumns{ + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res0", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res1", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res2", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res3", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res4", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res5", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res6", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res7", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res8", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeResource, Name: "res9", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span0", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span1", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span2", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span3", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span4", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span5", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span6", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span7", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span8", Type: backend.DedicatedColumnTypeString}, + backend.DedicatedColumn{Scope: backend.DedicatedColumnScopeSpan, Name: "span9", Type: backend.DedicatedColumnTypeString}, + }, + } ) cfg.ConsumeCycleDuration = 1 * time.Hour - bb := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + bb := New(cfg, logger, newPartitionRingReader(), o, store) defer func() { require.NoError(b, bb.stopping(nil)) }() // Startup (without starting the background consume cycle) diff --git a/tempodb/encoding/vparquet4/schema.go b/tempodb/encoding/vparquet4/schema.go index 64cb6299866..af3f1c26471 100644 --- a/tempodb/encoding/vparquet4/schema.go +++ b/tempodb/encoding/vparquet4/schema.go @@ -340,6 +340,14 @@ func attrToParquetTypeUnsupported(a *v1.KeyValue, p *Attribute) { // traceToParquet converts a tempopb.Trace to this schema's object model. Returns the new object and // a bool indicating if it's a connected trace or not func traceToParquet(meta *backend.BlockMeta, id common.ID, tr *tempopb.Trace, ot *Trace) (*Trace, bool) { + // Dedicated attribute column assignments + dedicatedResourceAttributes := dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource) + dedicatedSpanAttributes := dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan) + + return traceToParquetWithMapping(id, tr, ot, dedicatedResourceAttributes, dedicatedSpanAttributes) +} + +func traceToParquetWithMapping(id common.ID, tr *tempopb.Trace, ot *Trace, dedicatedResourceAttributes, dedicatedSpanAttributes dedicatedColumnMapping) (*Trace, bool) { if ot == nil { ot = &Trace{} } @@ -353,10 +361,6 @@ func traceToParquet(meta *backend.BlockMeta, id common.ID, tr *tempopb.Trace, ot var rootSpan *v1_trace.Span var rootBatch *v1_trace.ResourceSpans - // Dedicated attribute column assignments - dedicatedResourceAttributes := dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource) - dedicatedSpanAttributes := dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan) - ot.ResourceSpans = extendReuseSlice(len(tr.ResourceSpans), ot.ResourceSpans) for ib, b := range tr.ResourceSpans { ob := &ot.ResourceSpans[ib] diff --git a/tempodb/encoding/vparquet4/wal_block.go b/tempodb/encoding/vparquet4/wal_block.go index 627d1180c2a..8ca9bc25686 100644 --- a/tempodb/encoding/vparquet4/wal_block.go +++ b/tempodb/encoding/vparquet4/wal_block.go @@ -82,6 +82,8 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo path: path, ids: common.NewIDMap[int64](), ingestionSlack: ingestionSlack, + dedcolsRes: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource), + dedcolsSpan: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan), } // read all files in dir @@ -161,6 +163,8 @@ func createWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, inge path: filepath, ids: common.NewIDMap[int64](), ingestionSlack: ingestionSlack, + dedcolsRes: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource), + dedcolsSpan: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan), } // build folder @@ -284,6 +288,8 @@ type walBlock struct { meta *backend.BlockMeta path string ingestionSlack time.Duration + dedcolsRes dedicatedColumnMapping + dedcolsSpan dedicatedColumnMapping // Unflushed data buffer *Trace @@ -331,7 +337,7 @@ func (b *walBlock) Append(id common.ID, buff []byte, start, end uint32) error { func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end uint32) error { var connected bool - b.buffer, connected = traceToParquet(b.meta, id, trace, b.buffer) + b.buffer, connected = traceToParquetWithMapping(id, trace, b.buffer, b.dedcolsRes, b.dedcolsSpan) if !connected { dataquality.WarnDisconnectedTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) }