@@ -10,6 +10,7 @@ import (
10
10
"sort"
11
11
"strconv"
12
12
"strings"
13
+ "sync"
13
14
"time"
14
15
"unicode"
15
16
"unsafe"
@@ -79,6 +80,7 @@ var allowedLabelsForLevel = map[string]struct{}{
79
80
type Config struct {
80
81
// Distributors ring
81
82
DistributorRing RingConfig `yaml:"ring,omitempty"`
83
+ PushWorkerCount int `yaml:"push_worker_count"`
82
84
83
85
// For testing.
84
86
factory ring_client.PoolFactory `yaml:"-"`
@@ -102,7 +104,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
102
104
cfg .DistributorRing .RegisterFlags (fs )
103
105
cfg .RateStore .RegisterFlagsWithPrefix ("distributor.rate-store" , fs )
104
106
cfg .WriteFailuresLogging .RegisterFlagsWithPrefix ("distributor.write-failures-logging" , fs )
105
-
107
+ fs . IntVar ( & cfg . PushWorkerCount , "distributor.push-worker-count" , 256 , "Number of workers to push batches to ingesters." )
106
108
fs .BoolVar (& cfg .KafkaEnabled , "distributor.kafka-writes-enabled" , false , "Enable writes to Kafka during Push requests." )
107
109
fs .BoolVar (& cfg .IngesterEnabled , "distributor.ingester-writes-enabled" , true , "Enable writes to Ingesters during Push requests. Defaults to true." )
108
110
}
@@ -166,7 +168,9 @@ type Distributor struct {
166
168
replicationFactor prometheus.Gauge
167
169
streamShardCount prometheus.Counter
168
170
169
- usageTracker push.UsageTracker
171
+ usageTracker push.UsageTracker
172
+ ingesterTasks chan pushIngesterTask
173
+ ingesterTaskWg sync.WaitGroup
170
174
171
175
// kafka
172
176
kafkaWriter KafkaProducer
@@ -253,6 +257,7 @@ func New(
253
257
rateLimitStrat : rateLimitStrat ,
254
258
tee : tee ,
255
259
usageTracker : usageTracker ,
260
+ ingesterTasks : make (chan pushIngesterTask ),
256
261
ingesterAppends : promauto .With (registerer ).NewCounterVec (prometheus.CounterOpts {
257
262
Namespace : constants .Loki ,
258
263
Name : "distributor_ingester_appends_total" ,
@@ -354,6 +359,15 @@ func (d *Distributor) starting(ctx context.Context) error {
354
359
}
355
360
356
361
func (d * Distributor ) running (ctx context.Context ) error {
362
+ ctx , cancel := context .WithCancel (ctx )
363
+ defer func () {
364
+ cancel ()
365
+ d .ingesterTaskWg .Wait ()
366
+ }()
367
+ d .ingesterTaskWg .Add (d .cfg .PushWorkerCount )
368
+ for i := 0 ; i < d .cfg .PushWorkerCount ; i ++ {
369
+ go d .pushIngesterWorker (ctx )
370
+ }
357
371
select {
358
372
case <- ctx .Done ():
359
373
return nil
@@ -630,15 +644,26 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
630
644
}
631
645
632
646
for ingester , streams := range streamsByIngester {
633
- go func (ingester ring.InstanceDesc , samples []* streamTracker ) {
647
+ func (ingester ring.InstanceDesc , samples []* streamTracker ) {
634
648
// Use a background context to make sure all ingesters get samples even if we return early
635
649
localCtx , cancel := context .WithTimeout (context .Background (), d .clientCfg .RemoteTimeout )
636
- defer cancel ()
637
650
localCtx = user .InjectOrgID (localCtx , tenantID )
638
651
if sp := opentracing .SpanFromContext (ctx ); sp != nil {
639
652
localCtx = opentracing .ContextWithSpan (localCtx , sp )
640
653
}
641
- d .sendStreams (localCtx , ingester , samples , & tracker )
654
+ select {
655
+ case <- ctx .Done ():
656
+ cancel ()
657
+ return
658
+ case d .ingesterTasks <- pushIngesterTask {
659
+ ingester : ingester ,
660
+ streamTracker : samples ,
661
+ pushTracker : & tracker ,
662
+ ctx : localCtx ,
663
+ cancel : cancel ,
664
+ }:
665
+ return
666
+ }
642
667
}(ingesterDescs [ingester ], streams )
643
668
}
644
669
}
@@ -830,9 +855,30 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto
830
855
validation .MutatedBytes .WithLabelValues (validation .LineTooLong , vContext .userID ).Add (float64 (truncatedBytes ))
831
856
}
832
857
858
+ type pushIngesterTask struct {
859
+ streamTracker []* streamTracker
860
+ pushTracker * pushTracker
861
+ ingester ring.InstanceDesc
862
+ ctx context.Context
863
+ cancel context.CancelFunc
864
+ }
865
+
866
+ func (d * Distributor ) pushIngesterWorker (ctx context.Context ) {
867
+ defer d .ingesterTaskWg .Done ()
868
+ for {
869
+ select {
870
+ case <- ctx .Done ():
871
+ return
872
+ case task := <- d .ingesterTasks :
873
+ d .sendStreams (task )
874
+ }
875
+ }
876
+ }
877
+
833
878
// TODO taken from Cortex, see if we can refactor out an usable interface.
834
- func (d * Distributor ) sendStreams (ctx context.Context , ingester ring.InstanceDesc , streamTrackers []* streamTracker , pushTracker * pushTracker ) {
835
- err := d .sendStreamsErr (ctx , ingester , streamTrackers )
879
+ func (d * Distributor ) sendStreams (task pushIngesterTask ) {
880
+ defer task .cancel ()
881
+ err := d .sendStreamsErr (task .ctx , task .ingester , task .streamTracker )
836
882
837
883
// If we succeed, decrement each stream's pending count by one.
838
884
// If we reach the required number of successful puts on this stream, then
@@ -843,17 +889,17 @@ func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDes
843
889
//
844
890
// The use of atomic increments here guarantees only a single sendStreams
845
891
// goroutine will write to either channel.
846
- for i := range streamTrackers {
892
+ for i := range task . streamTracker {
847
893
if err != nil {
848
- if streamTrackers [i ].failed .Inc () <= int32 (streamTrackers [i ].maxFailures ) {
894
+ if task . streamTracker [i ].failed .Inc () <= int32 (task . streamTracker [i ].maxFailures ) {
849
895
continue
850
896
}
851
- pushTracker .doneWithResult (err )
897
+ task . pushTracker .doneWithResult (err )
852
898
} else {
853
- if streamTrackers [i ].succeeded .Inc () != int32 (streamTrackers [i ].minSuccess ) {
899
+ if task . streamTracker [i ].succeeded .Inc () != int32 (task . streamTracker [i ].minSuccess ) {
854
900
continue
855
901
}
856
- pushTracker .doneWithResult (nil )
902
+ task . pushTracker .doneWithResult (nil )
857
903
}
858
904
}
859
905
}
0 commit comments