Skip to content

Commit 36d61a5

Browse files
fix: Introduce feature flag for [last|first]_over_time sharding. (#13067)
(cherry picked from commit 6e45550)
1 parent b3a3c77 commit 36d61a5

File tree

4 files changed

+51
-32
lines changed

4 files changed

+51
-32
lines changed

docs/sources/shared/configuration.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -3828,7 +3828,8 @@ results_cache:
38283828
[parallelise_shardable_queries: <boolean> | default = true]
38293829
38303830
# A comma-separated list of LogQL vector and range aggregations that should be
3831-
# sharded
3831+
# sharded. Possible values 'quantile_over_time', 'last_over_time',
3832+
# 'first_over_time'.
38323833
# CLI flag: -querier.shard-aggregations
38333834
[shard_aggregations: <string> | default = ""]
38343835

pkg/logql/downstream_test.go

+26-29
Original file line numberDiff line numberDiff line change
@@ -37,38 +37,40 @@ func TestMappingEquivalence(t *testing.T) {
3737
for _, tc := range []struct {
3838
query string
3939
approximate bool
40+
shardAgg []string
4041
}{
41-
{`1`, false},
42-
{`1 + 1`, false},
43-
{`{a="1"}`, false},
44-
{`{a="1"} |= "number: 10"`, false},
45-
{`rate({a=~".+"}[1s])`, false},
46-
{`sum by (a) (rate({a=~".+"}[1s]))`, false},
47-
{`sum(rate({a=~".+"}[1s]))`, false},
48-
{`max without (a) (rate({a=~".+"}[1s]))`, false},
49-
{`count(rate({a=~".+"}[1s]))`, false},
50-
{`avg(rate({a=~".+"}[1s]))`, true},
51-
{`avg(rate({a=~".+"}[1s])) by (a)`, true},
52-
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false},
53-
{`sum(max(rate({a=~".+"}[1s])))`, false},
54-
{`max(count(rate({a=~".+"}[1s])))`, false},
55-
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false},
56-
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false},
57-
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
58-
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true},
59-
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true},
42+
{`1`, false, nil},
43+
{`1 + 1`, false, nil},
44+
{`{a="1"}`, false, nil},
45+
{`{a="1"} |= "number: 10"`, false, nil},
46+
{`rate({a=~".+"}[1s])`, false, nil},
47+
{`sum by (a) (rate({a=~".+"}[1s]))`, false, nil},
48+
{`sum(rate({a=~".+"}[1s]))`, false, nil},
49+
{`max without (a) (rate({a=~".+"}[1s]))`, false, nil},
50+
{`count(rate({a=~".+"}[1s]))`, false, nil},
51+
{`avg(rate({a=~".+"}[1s]))`, true, nil},
52+
{`avg(rate({a=~".+"}[1s])) by (a)`, true, nil},
53+
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false, nil},
54+
{`sum(max(rate({a=~".+"}[1s])))`, false, nil},
55+
{`max(count(rate({a=~".+"}[1s])))`, false, nil},
56+
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false, nil},
57+
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil},
58+
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil},
59+
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil},
60+
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
6061
{
6162
`
6263
(quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1)
6364
and
6465
avg by (a) (rate({a=~".+"}[1s]))
6566
`,
6667
false,
68+
nil,
6769
},
68-
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
69-
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
70-
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
71-
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
70+
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}},
71+
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}},
72+
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}},
73+
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}},
7274
// topk prefers already-seen values in tiebreakers. Since the test data generates
7375
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
7476
// sorted by labels, we don't expect this to pass.
@@ -102,12 +104,7 @@ func TestMappingEquivalence(t *testing.T) {
102104
ctx := user.InjectOrgID(context.Background(), "fake")
103105

104106
strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
105-
mapper := NewShardMapper(strategy, nilShardMetrics, []string{})
106-
// TODO (callum) refactor this test so that we won't need to set every
107-
// possible sharding config option to true when we have multiple in the future
108-
if tc.approximate {
109-
mapper.quantileOverTimeSharding = true
110-
}
107+
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
111108
_, _, mapped, err := mapper.Parse(params.GetExpression())
112109
require.NoError(t, err)
113110

pkg/logql/shardmapper.go

+22-1
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,39 @@ import (
1313
)
1414

1515
const (
16+
ShardLastOverTime = "last_over_time"
17+
ShardFirstOverTime = "first_over_time"
1618
ShardQuantileOverTime = "quantile_over_time"
1719
)
1820

1921
type ShardMapper struct {
2022
shards ShardingStrategy
2123
metrics *MapperMetrics
2224
quantileOverTimeSharding bool
25+
lastOverTimeSharding bool
26+
firstOverTimeSharding bool
2327
}
2428

2529
func NewShardMapper(strategy ShardingStrategy, metrics *MapperMetrics, shardAggregation []string) ShardMapper {
2630
quantileOverTimeSharding := false
31+
lastOverTimeSharding := false
32+
firstOverTimeSharding := false
2733
for _, a := range shardAggregation {
28-
if a == ShardQuantileOverTime {
34+
switch a {
35+
case ShardQuantileOverTime:
2936
quantileOverTimeSharding = true
37+
case ShardLastOverTime:
38+
lastOverTimeSharding = true
39+
case ShardFirstOverTime:
40+
firstOverTimeSharding = true
3041
}
3142
}
3243
return ShardMapper{
3344
shards: strategy,
3445
metrics: metrics,
3546
quantileOverTimeSharding: quantileOverTimeSharding,
47+
firstOverTimeSharding: firstOverTimeSharding,
48+
lastOverTimeSharding: lastOverTimeSharding,
3649
}
3750
}
3851

@@ -472,6 +485,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
472485
}, bytesPerShard, nil
473486

474487
case syntax.OpRangeTypeFirst:
488+
if !m.firstOverTimeSharding {
489+
return noOp(expr, m.shards.Resolver())
490+
}
491+
475492
potentialConflict := syntax.ReducesLabels(expr)
476493
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
477494
return m.mapSampleExpr(expr, r)
@@ -499,6 +516,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
499516
downstreams: downstreams,
500517
}, bytesPerShard, nil
501518
case syntax.OpRangeTypeLast:
519+
if !m.lastOverTimeSharding {
520+
return noOp(expr, m.shards.Resolver())
521+
}
522+
502523
potentialConflict := syntax.ReducesLabels(expr)
503524
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
504525
return m.mapSampleExpr(expr, r)

pkg/querier/queryrange/queryrangebase/roundtrip.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
5252

5353
cfg.ShardAggregations = []string{}
5454
f.Var(&cfg.ShardAggregations, "querier.shard-aggregations",
55-
"A comma-separated list of LogQL vector and range aggregations that should be sharded")
55+
"A comma-separated list of LogQL vector and range aggregations that should be sharded. Possible values 'quantile_over_time', 'last_over_time', 'first_over_time'.")
5656

5757
cfg.ResultsCacheConfig.RegisterFlags(f)
5858
}

0 commit comments

Comments
 (0)