Skip to content

Commit

Permalink
[Rhythm] Metrics generator read from kafka first pass (grafana#4359)
Browse files Browse the repository at this point in the history
* Metrics generator read from kafka first pass

* review feedback
  • Loading branch information
mdisibio authored and mapno committed Jan 10, 2025
1 parent d950d28 commit e810eab
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 9 deletions.
13 changes: 11 additions & 2 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,16 @@ func (t *App) initGenerator() (services.Service, error) {
}

t.cfg.Generator.Ring.ListenPort = t.cfg.Server.GRPCListenPort
genSvc, err := generator.New(&t.cfg.Generator, t.Overrides, prometheus.DefaultRegisterer, t.store, log.Logger)

t.cfg.Generator.Ingest = t.cfg.Ingest
t.cfg.Generator.Ingest.Kafka.ConsumerGroup = generator.ConsumerGroup

if t.cfg.Target == SingleBinary && len(t.cfg.Generator.AssignedPartitions) == 0 {
// In SingleBinary mode always use partition 0. This is for small installs or local/debugging setups.
t.cfg.Generator.AssignedPartitions = append(t.cfg.Generator.AssignedPartitions, 0)
}

genSvc, err := generator.New(&t.cfg.Generator, t.Overrides, prometheus.DefaultRegisterer, t.partitionRing, t.store, log.Logger)
if errors.Is(err, generator.ErrUnconfigured) && t.cfg.Target != MetricsGenerator { // just warn if we're not running the metrics-generator
level.Warn(log.Logger).Log("msg", "metrics-generator is not configured.", "err", err)
return services.NewIdleService(nil, nil), nil
Expand Down Expand Up @@ -666,7 +675,7 @@ func (t *App) setupModuleManager() error {
QueryFrontend: {Common, Store, OverridesAPI},
Distributor: {Common, IngesterRing, MetricsGeneratorRing, PartitionRing},
Ingester: {Common, Store, MemberlistKV, PartitionRing},
MetricsGenerator: {Common, OptionalStore, MemberlistKV, BlockBuilder},
MetricsGenerator: {Common, OptionalStore, MemberlistKV, BlockBuilder, PartitionRing},
Querier: {Common, Store, IngesterRing, MetricsGeneratorRing, SecondaryIngesterRing},
Compactor: {Common, Store, MemberlistKV},
BlockBuilder: {Common, Store, MemberlistKV, PartitionRing},
Expand Down
17 changes: 17 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/tempo/modules/generator/processor/spanmetrics"
"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/generator/storage"
"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
)
Expand All @@ -20,6 +21,8 @@ const (

// ringNameForServer is the name of the ring used by the metrics-generator server.
ringNameForServer = "metrics-generator"

ConsumerGroup = "metrics-generator"
)

// Config for a generator.
Expand All @@ -34,6 +37,10 @@ type Config struct {
MetricsIngestionSlack time.Duration `yaml:"metrics_ingestion_time_range_slack"`
QueryTimeout time.Duration `yaml:"query_timeout"`
OverrideRingKey string `yaml:"override_ring_key"`

// This config is dynamically injected because defined outside the generator config.
Ingest ingest.Config `yaml:"-"`
AssignedPartitions []int32 `yaml:"assigned_partitions" doc:"List of partitions assigned to this block builder."`
}

// RegisterFlagsAndApplyDefaults registers the flags.
Expand All @@ -51,6 +58,16 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.OverrideRingKey = generatorRingKey
}

func (cfg *Config) Validate() error {
if cfg.Ingest.Enabled {
if err := cfg.Ingest.Kafka.Validate(); err != nil {
return err
}
}

return nil
}

type ProcessorConfig struct {
ServiceGraphs servicegraphs.Config `yaml:"service_graphs"`
SpanMetrics spanmetrics.Config `yaml:"span_metrics"`
Expand Down
64 changes: 59 additions & 5 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/atomic"

"github.com/grafana/tempo/modules/generator/storage"
objStorage "github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/pkg/tempopb"
tempodb_wal "github.com/grafana/tempo/tempodb/wal"
)
Expand Down Expand Up @@ -65,14 +69,24 @@ type Generator struct {

reg prometheus.Registerer
logger log.Logger

kafkaWG sync.WaitGroup
kafkaStop chan struct{}
kafkaClient *kgo.Client
kafkaAdm *kadm.Client
partitionRing ring.PartitionRingReader
}

// New makes a new Generator.
func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Registerer, store objStorage.Store, logger log.Logger) (*Generator, error) {
func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Registerer, partitionRing ring.PartitionRingReader, store objStorage.Store, logger log.Logger) (*Generator, error) {
if cfg.Storage.Path == "" {
return nil, ErrUnconfigured
}

if err := cfg.Validate(); err != nil {
return nil, err
}

err := os.MkdirAll(cfg.Storage.Path, 0o700)
if err != nil {
return nil, fmt.Errorf("failed to mkdir on %s: %w", cfg.Storage.Path, err)
Expand All @@ -84,10 +98,10 @@ func New(cfg *Config, overrides metricsGeneratorOverrides, reg prometheus.Regist

instances: map[string]*instance{},

store: store,

reg: reg,
logger: logger,
store: store,
partitionRing: partitionRing,
reg: reg,
logger: logger,
}

// Lifecycler and ring
Expand Down Expand Up @@ -147,10 +161,45 @@ func (g *Generator) starting(ctx context.Context) (err error) {
return fmt.Errorf("unable to start metrics-generator dependencies: %w", err)
}

if g.cfg.Ingest.Enabled {
g.kafkaClient, err = ingest.NewReaderClient(
g.cfg.Ingest.Kafka,
ingest.NewReaderClientMetrics("generator", nil),
g.logger,
)
if err != nil {
return fmt.Errorf("failed to create kafka reader client: %w", err)
}

boff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Minute, // If there is a network hiccup, we prefer to wait longer retrying, than fail the service.
MaxRetries: 10,
})

for boff.Ongoing() {
err := g.kafkaClient.Ping(ctx)
if err == nil {
break
}
level.Warn(g.logger).Log("msg", "ping kafka; will retry", "err", err)
boff.Wait()
}
if err := boff.ErrCause(); err != nil {
return fmt.Errorf("failed to ping kafka: %w", err)
}

g.kafkaAdm = kadm.NewClient(g.kafkaClient)
}

return nil
}

func (g *Generator) running(ctx context.Context) error {
if g.cfg.Ingest.Enabled {
g.startKafka()
}

for {
select {
case <-ctx.Done():
Expand All @@ -175,6 +224,11 @@ func (g *Generator) stopping(_ error) error {
// Mark as read-only after we have removed ourselves from the ring
g.stopIncomingRequests()

// Stop reading from queue and wait for oustanding data to be processed and committed
if g.cfg.Ingest.Enabled {
g.stopKafka()
}

var wg sync.WaitGroup
wg.Add(len(g.instances))

Expand Down
Loading

0 comments on commit e810eab

Please sign in to comment.