-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathlimiter.go
191 lines (156 loc) · 5.29 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package ingester
import (
"fmt"
"math"
"sync"
"time"
"golang.org/x/time/rate"
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/validation"
)
const (
errMaxStreamsPerUserLimitExceeded = "tenant '%v' per-user streams limit exceeded, streams: %d exceeds calculated limit: %d (local limit: %d, global limit: %d, global/ingesters: %d)"
)
// RingCount is the interface exposed by a ring implementation which allows
// to count members
type RingCount interface {
HealthyInstancesCount() int
}
type Limits interface {
UnorderedWrites(userID string) bool
MaxLocalStreamsPerUser(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
ShardStreams(userID string) shardstreams.Config
}
// Limiter implements primitives to get the maximum number of streams
// an ingester can handle for a specific tenant
type Limiter struct {
limits Limits
ring RingCount
replicationFactor int
metrics *ingesterMetrics
mtx sync.RWMutex
disabled bool
}
func (l *Limiter) DisableForWALReplay() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = true
l.metrics.limiterEnabled.Set(0)
}
func (l *Limiter) Enable() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = false
l.metrics.limiterEnabled.Set(1)
}
// NewLimiter makes a new limiter
func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
metrics: metrics,
}
}
func (l *Limiter) UnorderedWrites(userID string) bool {
// WAL replay should not discard previously ack'd writes,
// so allow out of order writes while the limiter is disabled.
// This allows replaying unordered WALs into ordered configurations.
if l.disabled {
return true
}
return l.limits.UnorderedWrites(userID)
}
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
// Until the limiter actually starts, all accesses are successful.
// This is used to disable limits while recovering from the WAL.
l.mtx.RLock()
defer l.mtx.RUnlock()
if l.disabled {
return nil
}
// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)
// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)
// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)
// If both the local and global limits are disabled, we just
// use the largest int value
if calculatedLimit == 0 {
calculatedLimit = math.MaxInt32
}
if streams < calculatedLimit {
return nil
}
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}
func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}
// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
// (global limit / number of ingesters) * replication factor
numIngesters := l.ring.HealthyInstancesCount()
// May happen because the number of ingesters is asynchronously updated.
// If happens, we just temporarily ignore the global limit.
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}
return 0
}
func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}
return first
}
type RateLimiterStrategy interface {
RateLimit(tenant string) validation.RateLimit
}
func (l *Limiter) RateLimit(tenant string) validation.RateLimit {
if l.disabled {
return validation.Unlimited
}
return l.limits.PerStreamRateLimit(tenant)
}
type StreamRateLimiter struct {
recheckPeriod time.Duration
recheckAt time.Time
strategy RateLimiterStrategy
tenant string
lim *rate.Limiter
}
func NewStreamRateLimiter(strategy RateLimiterStrategy, tenant string, recheckPeriod time.Duration) *StreamRateLimiter {
rl := strategy.RateLimit(tenant)
return &StreamRateLimiter{
recheckPeriod: recheckPeriod,
strategy: strategy,
tenant: tenant,
lim: rate.NewLimiter(rl.Limit, rl.Burst),
}
}
func (l *StreamRateLimiter) AllowN(at time.Time, n int) bool {
now := time.Now()
if now.After(l.recheckAt) {
l.recheckAt = now.Add(l.recheckPeriod)
oldLim := l.lim.Limit()
oldBurst := l.lim.Burst()
next := l.strategy.RateLimit(l.tenant)
if oldLim != next.Limit || oldBurst != next.Burst {
// Edge case: rate.Inf doesn't advance nicely when reconfigured.
// To simplify, we just create a new limiter after reconfiguration rather
// than alter the existing one.
l.lim = rate.NewLimiter(next.Limit, next.Burst)
}
}
return l.lim.AllowN(at, n)
}