diff --git a/cmd/tempo/app/modules.go b/cmd/tempo/app/modules.go index e83d955234b..a68f1337690 100644 --- a/cmd/tempo/app/modules.go +++ b/cmd/tempo/app/modules.go @@ -283,7 +283,12 @@ func (t *App) initIngester() (services.Service, error) { t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort t.cfg.Ingester.DedicatedColumns = t.cfg.StorageConfig.Trace.Block.DedicatedColumns t.cfg.Ingester.IngestStorageConfig = t.cfg.Ingest - ingester, err := ingester.New(t.cfg.Ingester, t.store, t.Overrides, prometheus.DefaultRegisterer) + + // In SingleBinary mode don't try to discover parition from host name. Always use + // partition 0. This is for small installs or local/debugging setups. + singlePartition := t.cfg.Target == SingleBinary + + ingester, err := ingester.New(t.cfg.Ingester, t.store, t.Overrides, prometheus.DefaultRegisterer, singlePartition) if err != nil { return nil, fmt.Errorf("failed to create ingester: %w", err) } @@ -328,6 +333,11 @@ func (t *App) initBlockBuilder() (services.Service, error) { t.cfg.BlockBuilder.IngestStorageConfig = t.cfg.Ingest t.cfg.BlockBuilder.IngestStorageConfig.Kafka.ConsumerGroup = blockbuilder.ConsumerGroup + if t.cfg.Target == SingleBinary && len(t.cfg.BlockBuilder.AssignedPartitions) == 0 { + // In SingleBinary mode always use partition 0. This is for small installs or local/debugging setups. + t.cfg.BlockBuilder.AssignedPartitions = append(t.cfg.BlockBuilder.AssignedPartitions, 0) + } + t.blockBuilder = blockbuilder.New(t.cfg.BlockBuilder, log.Logger, t.partitionRing, t.Overrides, t.store) return t.blockBuilder, nil diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index db9fd78bce4..d59af091d0a 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -111,19 +111,18 @@ func (b *BlockBuilder) starting(ctx context.Context) (err error) { } func (b *BlockBuilder) running(ctx context.Context) error { + // Initial polling and delay cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeCycleDuration) - err := b.consumeCycle(ctx, cycleEndTime) - if err != nil { - return fmt.Errorf("failed to consume cycle: %w", err) - } - - cycleEndTime, waitTime := nextCycleEnd(time.Now(), b.cfg.ConsumeCycleDuration) + waitTime := 2 * time.Second for { select { case <-time.After(waitTime): - err = b.consumeCycle(ctx, cycleEndTime) + err := b.consumeCycle(ctx, cycleEndTime) if err != nil { - return fmt.Errorf("failed to consume cycle: %w", err) + b.logger.Log("msg", "consumeCycle failed", "err", err) + + // Don't progress cycle forward, keep trying at this timestamp + continue } cycleEndTime = cycleEndTime.Add(b.cfg.ConsumeCycleDuration) diff --git a/modules/distributor/config.go b/modules/distributor/config.go index b557dc6b07e..82c9edf89d1 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -89,3 +89,13 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Usage.RegisterFlagsAndApplyDefaults(prefix, f) } + +func (cfg *Config) Validate() error { + if cfg.KafkaWritePathEnabled { + if err := cfg.KafkaConfig.Validate(); err != nil { + return err + } + } + + return nil +} diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index fecd6965b8e..3cace8f1fa5 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -183,6 +183,10 @@ func New( loggingLevel dslog.Level, reg prometheus.Registerer, ) (*Distributor, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + factory := cfg.factory if factory == nil { factory = func(addr string) (ring_client.PoolClient, error) { @@ -419,20 +423,21 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te return nil, err } - if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 { - d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces) - } - if err := d.forwardersManager.ForTenant(userID).ForwardTraces(ctx, traces); err != nil { _ = level.Warn(d.logger).Log("msg", "failed to forward batches for tenant=%s: %w", userID, err) } - if d.cfg.KafkaWritePathEnabled { - err := d.sendWriteRequestsToPartitions(ctx, userID, keys, rebatchedTraces) + if d.kafkaProducer != nil { + err := d.sendToKafka(ctx, userID, keys, rebatchedTraces) if err != nil { // TODO: Handle error level.Error(d.logger).Log("msg", "failed to write to kafka", "err", err) } + } else { + // See if we need to send to the generators + if len(d.overrides.MetricsGeneratorProcessors(userID)) > 0 { + d.generatorForwarder.SendTraces(ctx, userID, keys, rebatchedTraces) + } } return nil, nil // PushRequest is ignored, so no reason to create one @@ -561,7 +566,7 @@ func (d *Distributor) UsageTrackerHandler() http.Handler { return nil } -func (d *Distributor) sendWriteRequestsToPartitions(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace) error { +func (d *Distributor) sendToKafka(ctx context.Context, userID string, keys []uint32, traces []*rebatchedTrace) error { marshalledTraces := make([][]byte, len(traces)) for i, t := range traces { b, err := proto.Marshal(t.trace) diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index fe550c1a898..eac6c8c378d 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -89,7 +89,7 @@ type Ingester struct { } // New makes a new Ingester. -func New(cfg Config, store storage.Store, overrides overrides.Interface, reg prometheus.Registerer) (*Ingester, error) { +func New(cfg Config, store storage.Store, overrides overrides.Interface, reg prometheus.Registerer, singlePartition bool) (*Ingester, error) { i := &Ingester{ cfg: cfg, instances: map[string]*instance{}, @@ -110,9 +110,15 @@ func New(cfg Config, store storage.Store, overrides overrides.Interface, reg pro i.lifecycler = lc if ingestCfg := cfg.IngestStorageConfig; ingestCfg.Enabled { - i.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.LifecyclerConfig.ID) - if err != nil { - return nil, fmt.Errorf("calculating ingester partition ID: %w", err) + if singlePartition { + // For single-binary don't require hostname to identify a partition. + // Assume partition 0. + i.ingestPartitionID = 0 + } else { + i.ingestPartitionID, err = ingest.IngesterPartitionID(cfg.LifecyclerConfig.ID) + if err != nil { + return nil, fmt.Errorf("calculating ingester partition ID: %w", err) + } } partitionRingKV := cfg.IngesterPartitionRing.KVStore.Mock diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 6aafa5c5c9c..15c48ca19c1 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -320,7 +320,8 @@ func TestIngesterStartingReadOnly(t *testing.T) { defaultIngesterTestConfig(), defaultIngesterStore(t, t.TempDir()), limits, - prometheus.NewPedanticRegistry()) + prometheus.NewPedanticRegistry(), + false) require.NoError(t, err) _, err = ingester.PushBytesV2(ctx, &tempopb.PushBytesRequest{}) @@ -455,7 +456,7 @@ func defaultIngesterWithOverrides(t testing.TB, tmpDir string, o overrides.Confi s := defaultIngesterStore(t, tmpDir) - ingester, err := New(ingesterConfig, s, limits, prometheus.NewPedanticRegistry()) + ingester, err := New(ingesterConfig, s, limits, prometheus.NewPedanticRegistry(), false) require.NoError(t, err, "unexpected error creating ingester") ingester.replayJitter = false diff --git a/pkg/ingest/encoding.go b/pkg/ingest/encoding.go index f1d514d86a1..42d14fcfd0a 100644 --- a/pkg/ingest/encoding.go +++ b/pkg/ingest/encoding.go @@ -50,6 +50,7 @@ func Encode(partitionID int32, tenantID string, req *tempopb.PushBytesRequest, m var records []*kgo.Record batch := encoderPoolGet() defer encoderPoolPut(batch) + currentSize := 0 for i, entry := range req.Traces {