Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block-builder: Add basic series limit #9983

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, st

logger := log.With(sp, "partition", partition, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset)

builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics)
builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics, b.cfg.ApplyMaxGlobalSeriesPerUserBelow)
defer runutil.CloseWithErrCapture(&err, builder, "closing tsdb builder")

// Section is a portion of the partition to process in a single pass. One cycle may process multiple sections if the partition is lagging.
Expand Down
3 changes: 3 additions & 0 deletions pkg/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Config struct {
ConsumeIntervalBuffer time.Duration `yaml:"consume_interval_buffer"`
LookbackOnNoCommit time.Duration `yaml:"lookback_on_no_commit" category:"advanced"`

ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"`

// Config parameters defined outside the block-builder config and are injected dynamically.
Kafka ingest.KafkaConfig `yaml:"-"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"`
Expand All @@ -45,6 +47,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.ConsumeInterval, "block-builder.consume-interval", time.Hour, "Interval between consumption cycles.")
f.DurationVar(&cfg.ConsumeIntervalBuffer, "block-builder.consume-interval-buffer", 15*time.Minute, "Extra buffer between subsequent consumption cycles. To avoid small blocks the block-builder consumes until the last hour boundary of the consumption interval, plus the buffer.")
f.DurationVar(&cfg.LookbackOnNoCommit, "block-builder.lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.")
f.IntVar(&cfg.ApplyMaxGlobalSeriesPerUserBelow, "block-builder.apply-max-global-series-per-user-below", 0, "Apply the global series limit per partition if the global series limit for the user is <= this given value. 0 means limits are disabled. If a user's limit is more than the given value, then the limits are not applied as well.")
}

func (cfg *Config) Validate() error {
Expand Down
71 changes: 52 additions & 19 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package blockbuilder

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -16,10 +15,12 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand All @@ -34,10 +35,11 @@ import (
type TSDBBuilder struct {
dataDir string

logger log.Logger
limits *validation.Overrides
blocksStorageCfg mimir_tsdb.BlocksStorageConfig
metrics tsdbBuilderMetrics
logger log.Logger
limits *validation.Overrides
blocksStorageCfg mimir_tsdb.BlocksStorageConfig
metrics tsdbBuilderMetrics
applyMaxGlobalSeriesPerUserBelow int // inclusive

// Map of a tenant in a partition to its TSDB.
tsdbsMu sync.RWMutex
Expand All @@ -58,14 +60,15 @@ type tsdbTenant struct {
tenantID string
}

func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, metrics tsdbBuilderMetrics) *TSDBBuilder {
func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, metrics tsdbBuilderMetrics, applyMaxGlobalSeriesPerUserBelow int) *TSDBBuilder {
return &TSDBBuilder{
dataDir: dataDir,
logger: logger,
limits: limits,
blocksStorageCfg: blocksStorageCfg,
metrics: metrics,
tsdbs: make(map[tsdbTenant]*userTSDB),
dataDir: dataDir,
logger: logger,
limits: limits,
blocksStorageCfg: blocksStorageCfg,
metrics: metrics,
applyMaxGlobalSeriesPerUserBelow: applyMaxGlobalSeriesPerUserBelow,
tsdbs: make(map[tsdbTenant]*userTSDB),
}
}

Expand Down Expand Up @@ -153,7 +156,7 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax

if err != nil {
// Only abort the processing on a terminal error.
if !softErrProcessor.ProcessErr(err, 0, nil) {
if !softErrProcessor.ProcessErr(err, 0, nil) && !errors.Is(err, errMaxInMemorySeriesReached) {
return false, err
}
discardedSamples++
Expand Down Expand Up @@ -197,7 +200,7 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax

if err != nil {
// Only abort the processing on a terminal error.
if !softErrProcessor.ProcessErr(err, 0, nil) {
if !softErrProcessor.ProcessErr(err, 0, nil) && !errors.Is(err, errMaxInMemorySeriesReached) {
return false, err
}
discardedSamples++
Expand Down Expand Up @@ -256,6 +259,20 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {
userID := tenant.tenantID
userLogger := util_log.WithUserID(userID, b.logger)

udb := &userTSDB{
userID: userID,
}

// Until we have a better way to enforce the same limits between ingesters and block builders,
// as a stop gap, we apply limits when they are under a given value. Reason is that when a tenant
// has higher limits, the higher usage and increase is expected and capacity is planned accordingly
// and the tenant is generally more careful. It is the smaller tenants that can create problem
// if they suddenly send millions of series when they are supposed to be limited to a few thousand.
userLimit := b.limits.MaxGlobalSeriesPerUser(userID)
if userLimit <= b.applyMaxGlobalSeriesPerUserBelow {
udb.maxGlobalSeries = userLimit
codesome marked this conversation as resolved.
Show resolved Hide resolved
}

db, err := tsdb.Open(udir, util_log.SlogFromGoKit(userLogger), nil, &tsdb.Options{
RetentionDuration: 0,
MinBlockDuration: 2 * time.Hour.Milliseconds(),
Expand All @@ -272,17 +289,15 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {
OutOfOrderCapMax: int64(b.blocksStorageCfg.TSDB.OutOfOrderCapacityMax),
EnableNativeHistograms: b.limits.NativeHistogramsIngestionEnabled(userID),
SecondaryHashFunction: nil, // TODO(codesome): May needed when applying limits. Used to determine the owned series by an ingesters
SeriesLifecycleCallback: udb,
}, nil)
if err != nil {
return nil, err
}

db.DisableCompactions()

udb := &userTSDB{
DB: db,
userID: userID,
}
udb.DB = db

return udb, nil
}
Expand Down Expand Up @@ -406,9 +421,27 @@ type extendedAppender interface {

type userTSDB struct {
*tsdb.DB
userID string
userID string
maxGlobalSeries int
}

var (
errMaxInMemorySeriesReached = errors.New("max series for user reached")
)

func (u *userTSDB) PreCreation(labels.Labels) error {
// Global series limit.
if u.maxGlobalSeries > 0 && u.Head().NumSeries() >= uint64(u.maxGlobalSeries) {
return errors.Wrapf(errMaxInMemorySeriesReached, "limit of %d reached for user %s", u.maxGlobalSeries, u.userID)
}

return nil
}

func (u *userTSDB) PostCreation(labels.Labels) {}

func (u *userTSDB) PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) {}

func (u *userTSDB) compactEverything(ctx context.Context) error {
blockRange := 2 * time.Hour.Milliseconds()

Expand Down
77 changes: 75 additions & 2 deletions pkg/blockbuilder/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestTSDBBuilder(t *testing.T) {
expSamples = expSamples[:0]
expHistograms = expHistograms[:0]
metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry())
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics)
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, 0)

currEnd, lastEnd := tc.currEnd, tc.lastEnd
{ // Add float samples.
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestProcessingEmptyRequest(t *testing.T) {
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry())
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics)
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, 0)

// Has a timeseries with no samples.
var rec kgo.Record
Expand Down Expand Up @@ -381,6 +381,79 @@ func TestProcessingEmptyRequest(t *testing.T) {
require.NoError(t, builder.tsdbs[tsdbTenant{0, userID}].Close())
}

// TestTSDBBuilderLimits tests the correct enforcements of series limits and also
// that series limit error does not cause the processing to fail (i.e. do not error out).
func TestTSDBBuilderLimits(t *testing.T) {
var (
user1 = "user1"
user2 = "user2"
// Limits should be applied only if the limits is under 50
applyGlobalSeriesLimitUnder = 50
)

limits := map[string]*validation.Limits{
user1: {
MaxGlobalSeriesPerUser: 30,
NativeHistogramsIngestionEnabled: true,
},
user2: {
MaxGlobalSeriesPerUser: 150,
NativeHistogramsIngestionEnabled: true,
},
}
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), validation.NewMockTenantLimits(limits))
require.NoError(t, err)

metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry())
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, applyGlobalSeriesLimitUnder)
t.Cleanup(func() {
require.NoError(t, builder.Close())
})

var (
processingRange = time.Hour.Milliseconds()
lastEnd = 2 * processingRange
currEnd = 3 * processingRange
ts = lastEnd + (processingRange / 2)
)
createRequest := func(userID string, seriesID int) *kgo.Record {
var (
samples []mimirpb.Sample
histograms []mimirpb.Histogram
)
if seriesID%2 == 0 {
samples = floatSample(ts, float64(seriesID))
} else {
histograms = histogramSample(ts)
}
return &kgo.Record{
Key: []byte(userID),
Value: createWriteRequest(t, strconv.Itoa(seriesID), samples, histograms),
}
}

for seriesID := 1; seriesID <= 100; seriesID++ {
for userID := range limits {
rec := createRequest(userID, seriesID)
allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false)
require.NoError(t, err)
require.Equal(t, true, allProcessed)
}
}

// user1 had a limit of 30, which is less than applyGlobalSeriesLimitUnder.
// So the limit must be applied.
db, err := builder.getOrCreateTSDB(tsdbTenant{tenantID: user1})
require.NoError(t, err)
require.Equal(t, uint64(30), db.Head().NumSeries())

// user2 had a limit of 100, which is greather than applyGlobalSeriesLimitUnder.
// So the limit must not be applied.
db, err = builder.getOrCreateTSDB(tsdbTenant{tenantID: user2})
require.NoError(t, err)
require.Equal(t, uint64(100), db.Head().NumSeries())
}

func defaultLimitsTestConfig() validation.Limits {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
Expand Down