Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blooms): Minor fixes and improvements for testing in dev #13341

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Builder struct {
logger log.Logger

tsdbStore common.TSDBStore
bloomStore bloomshipper.StoreBase
bloomStore bloomshipper.Store
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient
Expand All @@ -55,20 +55,23 @@ func New(
storeCfg storage.Config,
storageMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.StoreBase,
bloomStore bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)

builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)

tsdbStore, err := common.NewTSDBStores(schemaCfg, storeCfg, storageMetrics, logger)
if err != nil {
return nil, fmt.Errorf("error creating TSDB store: %w", err)
}

metrics := NewMetrics(r, v1.NewMetrics(r))
metrics := NewMetrics(r, bloomStore.BloomMetrics())
b := &Builder{
ID: uuid.NewString(),
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
Expand Down Expand Up @@ -341,7 +344,7 @@ func (b *Builder) processTask(
blocksIter,
b.rwFn,
nil, // TODO(salvacorts): Pass reporter or remove when we address tracking
b.metrics,
b.bloomStore.BloomMetrics(),
logger,
)

Expand Down
12 changes: 11 additions & 1 deletion pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
)
Expand Down Expand Up @@ -86,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, nil, logger, prometheus.DefaultRegisterer)
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
Expand Down Expand Up @@ -240,6 +242,14 @@ func (f fakeLimits) BloomCompactorMaxBloomSize(_ string) int {
panic("implement me")
}

type fakeBloomStore struct {
bloomshipper.Store
}

func (f fakeBloomStore) BloomMetrics() *v1.Metrics {
return nil
}

func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions pkg/bloombuild/builder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ const (
)

type Metrics struct {
bloomMetrics *v1.Metrics
running prometheus.Gauge
running prometheus.Gauge

taskStarted prometheus.Counter
taskCompleted *prometheus.CounterVec
Expand All @@ -35,7 +34,6 @@ type Metrics struct {

func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
return &Metrics{
bloomMetrics: bloomMetrics,
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Expand Down
14 changes: 7 additions & 7 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SimpleBloomGenerator struct {
// options to build blocks with
opts v1.BlockOptions

metrics *Metrics
metrics *v1.Metrics
logger log.Logger

readWriterFn func() (v1.BlockWriter, v1.BlockReader)
Expand All @@ -70,7 +70,7 @@ func NewSimpleBloomGenerator(
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *Metrics,
metrics *v1.Metrics,
logger log.Logger,
) *SimpleBloomGenerator {
return &SimpleBloomGenerator{
Expand All @@ -92,7 +92,7 @@ func NewSimpleBloomGenerator(
opts.Schema.NGramLen(),
opts.Schema.NGramSkip(),
int(opts.UnencodedBlockOptions.MaxBloomSizeBytes),
metrics.bloomMetrics,
metrics,
),
}
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
metrics *v1.Metrics
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
Expand All @@ -177,7 +177,7 @@ type LazyBlockBuilderIterator struct {
func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
metrics *v1.Metrics,
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
Expand Down Expand Up @@ -214,7 +214,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics)
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics)
writer, reader := b.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
Expand All @@ -229,7 +229,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

b.curr = v1.NewBlock(reader, b.metrics.bloomMetrics)
b.curr = v1.NewBlock(reader, b.metrics)
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf)
},
nil,
NewMetrics(nil, v1.NewMetrics(nil)),
v1.NewMetrics(nil),
log.NewNopLogger(),
)
}
Expand Down
9 changes: 1 addition & 8 deletions pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type Metrics struct {
inflightRequests prometheus.Summary
tasksRequeued prometheus.Counter
taskLost prometheus.Counter
tasksFailed prometheus.Counter

buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
Expand Down Expand Up @@ -86,12 +85,6 @@ func NewMetrics(
Name: "tasks_lost_total",
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),
tasksFailed: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tasks_failed_total",
Help: "Total number of tasks that failed to be processed by builders (after the configured retries).",
}),

buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Expand Down Expand Up @@ -149,7 +142,7 @@ func NewMetrics(
Subsystem: metricsSubsystem,
Name: "tenant_tasks_completed",
Help: "Number of tasks completed for a tenant during the current build iteration.",
}, []string{"tenant"}),
}, []string{"tenant", "status"}),
}
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ func (p *Planner) loadTenantWork(
// NOTE(salvacorts): We will reset them multiple times for the same tenant, for each table, but it's not a big deal.
// Alternatively, we can use a Counter instead of a Gauge, but I think a Gauge is easier to reason about.
p.metrics.tenantTasksPlanned.WithLabelValues(tenant).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Set(0)
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Set(0)

level.Debug(p.logger).Log("msg", "loading work for tenant", "table", table, "tenant", tenant, "splitFactor", splitFactor)
}
Expand Down Expand Up @@ -799,7 +800,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.metrics.tasksFailed.Inc()
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusFailure).Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
Expand Down Expand Up @@ -841,7 +842,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant).Inc()
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusSuccess).Inc()

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down
Loading