@@ -8,13 +8,13 @@ import (
8
8
"time"
9
9
10
10
"github.com/go-kit/log/level"
11
+ "github.com/grafana/dskit/tenant"
11
12
"github.com/prometheus/client_golang/prometheus"
12
13
"github.com/prometheus/common/model"
13
14
"github.com/prometheus/prometheus/model/labels"
14
15
"github.com/weaveworks/common/user"
15
16
"golang.org/x/net/context"
16
-
17
- "github.com/grafana/dskit/tenant"
17
+ "golang.org/x/time/rate"
18
18
19
19
"github.com/grafana/loki/pkg/chunkenc"
20
20
"github.com/grafana/loki/pkg/storage/chunk"
@@ -27,6 +27,9 @@ const (
27
27
// position, not wallclock time.
28
28
flushBackoff = 1 * time .Second
29
29
30
+ // Lower bound on flushes per check period for rate-limiter
31
+ minFlushes = 100
32
+
30
33
nameLabel = "__name__"
31
34
logsValue = "logs"
32
35
@@ -87,13 +90,14 @@ func (o *flushOp) Priority() int64 {
87
90
return - int64 (o .from )
88
91
}
89
92
90
- // sweepUsers periodically schedules series for flushing and garbage collects users with no series
93
+ // sweepUsers periodically schedules series for flushing and garbage collects users with no streams
91
94
func (i * Ingester ) sweepUsers (immediate , mayRemoveStreams bool ) {
92
95
instances := i .getInstances ()
93
96
94
97
for _ , instance := range instances {
95
98
i .sweepInstance (instance , immediate , mayRemoveStreams )
96
99
}
100
+ i .setFlushRate ()
97
101
}
98
102
99
103
func (i * Ingester ) sweepInstance (instance * instance , immediate , mayRemoveStreams bool ) {
@@ -125,6 +129,24 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
125
129
})
126
130
}
127
131
132
+ // Compute a rate such to spread calls to the store over nearly all of the flush period,
133
+ // for example if we have 600 items in the queue and period 1 min we will send 10.5 per second.
134
+ // Note if the store can't keep up with this rate then it doesn't make any difference.
135
+ func (i * Ingester ) setFlushRate () {
136
+ totalQueueLength := 0
137
+ for _ , q := range i .flushQueues {
138
+ totalQueueLength += q .Length ()
139
+ }
140
+ const fudge = 1.05 // aim to finish a little bit before the end of the period
141
+ flushesPerSecond := float64 (totalQueueLength ) / i .cfg .FlushCheckPeriod .Seconds () * fudge
142
+ // Avoid going very slowly with tiny queues
143
+ if flushesPerSecond * i .cfg .FlushCheckPeriod .Seconds () < minFlushes {
144
+ flushesPerSecond = minFlushes / i .cfg .FlushCheckPeriod .Seconds ()
145
+ }
146
+ level .Debug (util_log .Logger ).Log ("msg" , "computed flush rate" , "rate" , flushesPerSecond )
147
+ i .flushRateLimiter .SetLimit (rate .Limit (flushesPerSecond ))
148
+ }
149
+
128
150
func (i * Ingester ) flushLoop (j int ) {
129
151
defer func () {
130
152
level .Debug (util_log .Logger ).Log ("msg" , "Ingester.flushLoop() exited" )
@@ -138,6 +160,10 @@ func (i *Ingester) flushLoop(j int) {
138
160
}
139
161
op := o .(* flushOp )
140
162
163
+ if ! op .immediate {
164
+ _ = i .flushRateLimiter .Wait (context .Background ())
165
+ }
166
+
141
167
err := i .flushUserSeries (op .userID , op .fp , op .immediate )
142
168
if err != nil {
143
169
level .Error (util_log .WithUserID (op .userID , util_log .Logger )).Log ("msg" , "failed to flush" , "err" , err )
0 commit comments