-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathcomposite_store_entry.go
235 lines (195 loc) · 7.67 KB
/
composite_store_entry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package stores
import (
"context"
"fmt"
"time"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/v3/pkg/storage/errors"
"github.com/grafana/loki/v3/pkg/storage/stores/index"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
"github.com/grafana/loki/v3/pkg/util/validation"
)
type StoreLimits interface {
MaxChunksPerQueryFromStore(string) int
MaxQueryLength(context.Context, string) time.Duration
}
type compositeStoreEntry struct {
start model.Time
Store
}
type storeEntry struct {
limits StoreLimits
stop func()
fetcher *fetcher.Fetcher
indexReader index.Reader
ChunkWriter
}
func (c *storeEntry) GetChunks(
ctx context.Context,
userID string,
from,
through model.Time,
predicate chunk.Predicate,
storeChunksOverride *logproto.ChunkRefGroup,
) ([][]chunk.Chunk,
[]*fetcher.Fetcher,
error) {
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, nil, err
} else if shortcut {
return nil, nil, nil
}
var refs []*logproto.ChunkRef
if storeChunksOverride != nil {
refs = storeChunksOverride.Refs
} else {
// TODO(owen-d): fix needless O(n) conversions that stem from difference in store impls (value)
// vs proto impls (reference)
var values []logproto.ChunkRef
values, err = c.indexReader.GetChunkRefs(ctx, userID, from, through, predicate)
// convert to refs
refs = make([]*logproto.ChunkRef, 0, len(values))
for i := range values {
refs = append(refs, &values[i])
}
}
// Store overrides are passed through from the parent and can reference chunks not owned by this particular store,
// so we filter them out based on the requested timerange passed, which is guaranteed to be within the store's timerange.
// Otherwise, we'd return chunks that do not belong to the store, which would error during fetching.
chunks := filterForTimeRange(refs, from, through)
return [][]chunk.Chunk{chunks}, []*fetcher.Fetcher{c.fetcher}, err
}
func filterForTimeRange(refs []*logproto.ChunkRef, from, through model.Time) []chunk.Chunk {
filtered := make([]chunk.Chunk, 0, len(refs))
for _, ref := range refs {
if (through >= ref.From && from < ref.Through) || (ref.From == from && ref.Through == from) {
filtered = append(filtered, chunk.Chunk{
ChunkRef: *ref,
})
}
}
return filtered
}
func (c *storeEntry) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
return c.indexReader.GetSeries(ctx, userID, from, through, matchers...)
}
func (c *storeEntry) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
c.indexReader.SetChunkFilterer(chunkFilter)
}
// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *storeEntry) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)
return c.indexReader.LabelNamesForMetricName(ctx, userID, from, through, metricName, matchers...)
}
func (c *storeEntry) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
defer log.Span.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
return c.indexReader.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
}
func (c *storeEntry) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
return c.indexReader.Stats(ctx, userID, from, through, matchers...)
}
func (c *storeEntry) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.Volume")
defer sp.Finish()
shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
sp.LogKV(
"user", userID,
"from", from.Time(),
"through", through.Time(),
"matchers", syntax.MatchersString(matchers),
"err", err,
"limit", limit,
"aggregateBy", aggregateBy,
)
return c.indexReader.Volume(ctx, userID, from, through, limit, targetLabels, aggregateBy, matchers...)
}
func (c *storeEntry) GetShards(
ctx context.Context,
userID string,
from, through model.Time,
targetBytesPerShard uint64,
predicate chunk.Predicate,
) (*logproto.ShardsResponse, error) {
_, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
}
return c.indexReader.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate)
}
func (c *storeEntry) HasForSeries(from, through model.Time) (sharding.ForSeries, bool) {
return c.indexReader.HasForSeries(from, through)
}
func (c *storeEntry) validateQueryTimeRange(ctx context.Context, userID string, from *model.Time, through *model.Time) (bool, error) {
//nolint:ineffassign,staticcheck //Leaving ctx even though we don't currently use it, we want to make it available for when we might need it and hopefully will ensure us using the correct context at that time
if *through < *from {
return false, errors.QueryError(fmt.Sprintf("invalid query, through < from (%s < %s)", through, from))
}
maxQueryLength := c.limits.MaxQueryLength(ctx, userID)
if maxQueryLength > 0 && (*through).Sub(*from) > maxQueryLength {
return false, errors.QueryError(fmt.Sprintf(validation.ErrQueryTooLong, model.Duration((*through).Sub(*from)), model.Duration(maxQueryLength)))
}
now := model.Now()
if from.After(now) {
// time-span start is in future ... regard as legal
level.Info(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now)
return true, nil
}
if through.After(now.Add(5 * time.Minute)) {
// time-span end is in future ... regard as legal
level.Info(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now)
*through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes
}
return false, nil
}
func (c *storeEntry) GetChunkFetcher(_ model.Time) *fetcher.Fetcher {
return c.fetcher
}
func (c *storeEntry) Stop() {
if c.stop != nil {
c.stop()
}
}