Skip to content

Commit

Permalink
block-builder: Add basic series limit (#9983)
Browse files Browse the repository at this point in the history
* Series limit for block builder

Signed-off-by: Ganesh Vernekar <[email protected]>

* Add TestTSDBBuilderLimits to test the limits

Signed-off-by: Ganesh Vernekar <[email protected]>

* Make the flag experimental

Signed-off-by: Ganesh Vernekar <[email protected]>

* Update flag and add comment

Signed-off-by: Ganesh Vernekar <[email protected]>

---------

Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome authored Nov 27, 2024
1 parent 832d02b commit b941204
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, st

logger := log.With(sp, "partition", partition, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset)

builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics)
builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics, b.cfg.ApplyMaxGlobalSeriesPerUserBelow)
defer runutil.CloseWithErrCapture(&err, builder, "closing tsdb builder")

// Section is a portion of the partition to process in a single pass. One cycle may process multiple sections if the partition is lagging.
Expand Down
3 changes: 3 additions & 0 deletions pkg/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Config struct {
ConsumeIntervalBuffer time.Duration `yaml:"consume_interval_buffer"`
LookbackOnNoCommit time.Duration `yaml:"lookback_on_no_commit" category:"advanced"`

ApplyMaxGlobalSeriesPerUserBelow int `yaml:"apply_max_global_series_per_user_below" category:"experimental"`

// Config parameters defined outside the block-builder config and are injected dynamically.
Kafka ingest.KafkaConfig `yaml:"-"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"-"`
Expand All @@ -45,6 +47,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.DurationVar(&cfg.ConsumeInterval, "block-builder.consume-interval", time.Hour, "Interval between consumption cycles.")
f.DurationVar(&cfg.ConsumeIntervalBuffer, "block-builder.consume-interval-buffer", 15*time.Minute, "Extra buffer between subsequent consumption cycles. To avoid small blocks the block-builder consumes until the last hour boundary of the consumption interval, plus the buffer.")
f.DurationVar(&cfg.LookbackOnNoCommit, "block-builder.lookback-on-no-commit", 12*time.Hour, "How much of the historical records to look back when there is no kafka commit for a partition.")
f.IntVar(&cfg.ApplyMaxGlobalSeriesPerUserBelow, "block-builder.apply-max-global-series-per-user-below", 0, "Apply the global series limit per partition if the global series limit for the user is <= this given value. 0 means limits are disabled. If a user's limit is more than the given value, then the limits are not applied as well.")
}

func (cfg *Config) Validate() error {
Expand Down
71 changes: 52 additions & 19 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package blockbuilder

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -16,10 +15,12 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand All @@ -34,10 +35,11 @@ import (
type TSDBBuilder struct {
dataDir string

logger log.Logger
limits *validation.Overrides
blocksStorageCfg mimir_tsdb.BlocksStorageConfig
metrics tsdbBuilderMetrics
logger log.Logger
limits *validation.Overrides
blocksStorageCfg mimir_tsdb.BlocksStorageConfig
metrics tsdbBuilderMetrics
applyMaxGlobalSeriesPerUserBelow int // inclusive

// Map of a tenant in a partition to its TSDB.
tsdbsMu sync.RWMutex
Expand All @@ -58,14 +60,15 @@ type tsdbTenant struct {
tenantID string
}

func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, metrics tsdbBuilderMetrics) *TSDBBuilder {
func NewTSDBBuilder(logger log.Logger, dataDir string, blocksStorageCfg mimir_tsdb.BlocksStorageConfig, limits *validation.Overrides, metrics tsdbBuilderMetrics, applyMaxGlobalSeriesPerUserBelow int) *TSDBBuilder {
return &TSDBBuilder{
dataDir: dataDir,
logger: logger,
limits: limits,
blocksStorageCfg: blocksStorageCfg,
metrics: metrics,
tsdbs: make(map[tsdbTenant]*userTSDB),
dataDir: dataDir,
logger: logger,
limits: limits,
blocksStorageCfg: blocksStorageCfg,
metrics: metrics,
applyMaxGlobalSeriesPerUserBelow: applyMaxGlobalSeriesPerUserBelow,
tsdbs: make(map[tsdbTenant]*userTSDB),
}
}

Expand Down Expand Up @@ -153,7 +156,7 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax

if err != nil {
// Only abort the processing on a terminal error.
if !softErrProcessor.ProcessErr(err, 0, nil) {
if !softErrProcessor.ProcessErr(err, 0, nil) && !errors.Is(err, errMaxInMemorySeriesReached) {
return false, err
}
discardedSamples++
Expand Down Expand Up @@ -197,7 +200,7 @@ func (b *TSDBBuilder) Process(ctx context.Context, rec *kgo.Record, lastBlockMax

if err != nil {
// Only abort the processing on a terminal error.
if !softErrProcessor.ProcessErr(err, 0, nil) {
if !softErrProcessor.ProcessErr(err, 0, nil) && !errors.Is(err, errMaxInMemorySeriesReached) {
return false, err
}
discardedSamples++
Expand Down Expand Up @@ -256,6 +259,20 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {
userID := tenant.tenantID
userLogger := util_log.WithUserID(userID, b.logger)

udb := &userTSDB{
userID: userID,
}

// Until we have a better way to enforce the same limits between ingesters and block builders,
// as a stop gap, we apply limits when they are under a given value. Reason is that when a tenant
// has higher limits, the higher usage and increase is expected and capacity is planned accordingly
// and the tenant is generally more careful. It is the smaller tenants that can create problem
// if they suddenly send millions of series when they are supposed to be limited to a few thousand.
userLimit := b.limits.MaxGlobalSeriesPerUser(userID)
if userLimit <= b.applyMaxGlobalSeriesPerUserBelow {
udb.maxGlobalSeries = userLimit
}

db, err := tsdb.Open(udir, util_log.SlogFromGoKit(userLogger), nil, &tsdb.Options{
RetentionDuration: 0,
MinBlockDuration: 2 * time.Hour.Milliseconds(),
Expand All @@ -272,17 +289,15 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {
OutOfOrderCapMax: int64(b.blocksStorageCfg.TSDB.OutOfOrderCapacityMax),
EnableNativeHistograms: b.limits.NativeHistogramsIngestionEnabled(userID),
SecondaryHashFunction: nil, // TODO(codesome): May needed when applying limits. Used to determine the owned series by an ingesters
SeriesLifecycleCallback: udb,
}, nil)
if err != nil {
return nil, err
}

db.DisableCompactions()

udb := &userTSDB{
DB: db,
userID: userID,
}
udb.DB = db

return udb, nil
}
Expand Down Expand Up @@ -406,9 +421,27 @@ type extendedAppender interface {

type userTSDB struct {
*tsdb.DB
userID string
userID string
maxGlobalSeries int
}

var (
errMaxInMemorySeriesReached = errors.New("max series for user reached")
)

func (u *userTSDB) PreCreation(labels.Labels) error {
// Global series limit.
if u.maxGlobalSeries > 0 && u.Head().NumSeries() >= uint64(u.maxGlobalSeries) {
return errors.Wrapf(errMaxInMemorySeriesReached, "limit of %d reached for user %s", u.maxGlobalSeries, u.userID)
}

return nil
}

func (u *userTSDB) PostCreation(labels.Labels) {}

func (u *userTSDB) PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) {}

func (u *userTSDB) compactEverything(ctx context.Context) error {
blockRange := 2 * time.Hour.Milliseconds()

Expand Down
77 changes: 75 additions & 2 deletions pkg/blockbuilder/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestTSDBBuilder(t *testing.T) {
expSamples = expSamples[:0]
expHistograms = expHistograms[:0]
metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry())
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics)
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, 0)

currEnd, lastEnd := tc.currEnd, tc.lastEnd
{ // Add float samples.
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestProcessingEmptyRequest(t *testing.T) {
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry())
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics)
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, 0)

// Has a timeseries with no samples.
var rec kgo.Record
Expand Down Expand Up @@ -381,6 +381,79 @@ func TestProcessingEmptyRequest(t *testing.T) {
require.NoError(t, builder.tsdbs[tsdbTenant{0, userID}].Close())
}

// TestTSDBBuilderLimits tests the correct enforcements of series limits and also
// that series limit error does not cause the processing to fail (i.e. do not error out).
func TestTSDBBuilderLimits(t *testing.T) {
var (
user1 = "user1"
user2 = "user2"
// Limits should be applied only if the limits is under 50
applyGlobalSeriesLimitUnder = 50
)

limits := map[string]*validation.Limits{
user1: {
MaxGlobalSeriesPerUser: 30,
NativeHistogramsIngestionEnabled: true,
},
user2: {
MaxGlobalSeriesPerUser: 150,
NativeHistogramsIngestionEnabled: true,
},
}
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), validation.NewMockTenantLimits(limits))
require.NoError(t, err)

metrics := newTSDBBBuilderMetrics(prometheus.NewPedanticRegistry())
builder := NewTSDBBuilder(log.NewNopLogger(), t.TempDir(), mimir_tsdb.BlocksStorageConfig{}, overrides, metrics, applyGlobalSeriesLimitUnder)
t.Cleanup(func() {
require.NoError(t, builder.Close())
})

var (
processingRange = time.Hour.Milliseconds()
lastEnd = 2 * processingRange
currEnd = 3 * processingRange
ts = lastEnd + (processingRange / 2)
)
createRequest := func(userID string, seriesID int) *kgo.Record {
var (
samples []mimirpb.Sample
histograms []mimirpb.Histogram
)
if seriesID%2 == 0 {
samples = floatSample(ts, float64(seriesID))
} else {
histograms = histogramSample(ts)
}
return &kgo.Record{
Key: []byte(userID),
Value: createWriteRequest(t, strconv.Itoa(seriesID), samples, histograms),
}
}

for seriesID := 1; seriesID <= 100; seriesID++ {
for userID := range limits {
rec := createRequest(userID, seriesID)
allProcessed, err := builder.Process(context.Background(), rec, lastEnd, currEnd, false)
require.NoError(t, err)
require.Equal(t, true, allProcessed)
}
}

// user1 had a limit of 30, which is less than applyGlobalSeriesLimitUnder.
// So the limit must be applied.
db, err := builder.getOrCreateTSDB(tsdbTenant{tenantID: user1})
require.NoError(t, err)
require.Equal(t, uint64(30), db.Head().NumSeries())

// user2 had a limit of 100, which is greather than applyGlobalSeriesLimitUnder.
// So the limit must not be applied.
db, err = builder.getOrCreateTSDB(tsdbTenant{tenantID: user2})
require.NoError(t, err)
require.Equal(t, uint64(100), db.Head().NumSeries())
}

func defaultLimitsTestConfig() validation.Limits {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
Expand Down

0 comments on commit b941204

Please sign in to comment.