Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Introduce feature flag for [last|first]_over_time sharding. #13067

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3879,7 +3879,8 @@ results_cache:
[parallelise_shardable_queries: <boolean> | default = true]

# A comma-separated list of LogQL vector and range aggregations that should be
# sharded
# sharded. Possible values 'quantile_over_time', 'last_over_time',
# 'first_over_time'.
# CLI flag: -querier.shard-aggregations
[shard_aggregations: <string> | default = ""]

Expand Down
55 changes: 26 additions & 29 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,40 @@ func TestMappingEquivalence(t *testing.T) {
for _, tc := range []struct {
query string
approximate bool
shardAgg []string
}{
{`1`, false},
{`1 + 1`, false},
{`{a="1"}`, false},
{`{a="1"} |= "number: 10"`, false},
{`rate({a=~".+"}[1s])`, false},
{`sum by (a) (rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"}[1s]))`, false},
{`max without (a) (rate({a=~".+"}[1s]))`, false},
{`count(rate({a=~".+"}[1s]))`, false},
{`avg(rate({a=~".+"}[1s]))`, true},
{`avg(rate({a=~".+"}[1s])) by (a)`, true},
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false},
{`sum(max(rate({a=~".+"}[1s])))`, false},
{`max(count(rate({a=~".+"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false},
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true},
{`1`, false, nil},
{`1 + 1`, false, nil},
{`{a="1"}`, false, nil},
{`{a="1"} |= "number: 10"`, false, nil},
{`rate({a=~".+"}[1s])`, false, nil},
{`sum by (a) (rate({a=~".+"}[1s]))`, false, nil},
{`sum(rate({a=~".+"}[1s]))`, false, nil},
{`max without (a) (rate({a=~".+"}[1s]))`, false, nil},
{`count(rate({a=~".+"}[1s]))`, false, nil},
{`avg(rate({a=~".+"}[1s]))`, true, nil},
{`avg(rate({a=~".+"}[1s])) by (a)`, true, nil},
{`1 + sum by (cluster) (rate({a=~".+"}[1s]))`, false, nil},
{`sum(max(rate({a=~".+"}[1s])))`, false, nil},
{`max(count(rate({a=~".+"}[1s])))`, false, nil},
{`max(sum by (cluster) (rate({a=~".+"}[1s]))) / count(rate({a=~".+"}[1s]))`, false, nil},
{`sum(rate({a=~".+"} |= "foo" != "foo"[1s]) or vector(1))`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, nil},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{
`
(quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1)
and
avg by (a) (rate({a=~".+"}[1s]))
`,
false,
nil,
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down Expand Up @@ -102,12 +104,7 @@ func TestMappingEquivalence(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
mapper := NewShardMapper(strategy, nilShardMetrics, []string{})
// TODO (callum) refactor this test so that we won't need to set every
// possible sharding config option to true when we have multiple in the future
if tc.approximate {
mapper.quantileOverTimeSharding = true
}
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

Expand Down
23 changes: 22 additions & 1 deletion pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,39 @@ import (
)

const (
ShardLastOverTime = "last_over_time"
ShardFirstOverTime = "first_over_time"
ShardQuantileOverTime = "quantile_over_time"
)

type ShardMapper struct {
shards ShardingStrategy
metrics *MapperMetrics
quantileOverTimeSharding bool
lastOverTimeSharding bool
firstOverTimeSharding bool
}

func NewShardMapper(strategy ShardingStrategy, metrics *MapperMetrics, shardAggregation []string) ShardMapper {
quantileOverTimeSharding := false
lastOverTimeSharding := false
firstOverTimeSharding := false
for _, a := range shardAggregation {
if a == ShardQuantileOverTime {
switch a {
case ShardQuantileOverTime:
quantileOverTimeSharding = true
case ShardLastOverTime:
lastOverTimeSharding = true
case ShardFirstOverTime:
firstOverTimeSharding = true
}
}
return ShardMapper{
shards: strategy,
metrics: metrics,
quantileOverTimeSharding: quantileOverTimeSharding,
firstOverTimeSharding: firstOverTimeSharding,
lastOverTimeSharding: lastOverTimeSharding,
}
}

Expand Down Expand Up @@ -472,6 +485,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
}, bytesPerShard, nil

case syntax.OpRangeTypeFirst:
if !m.firstOverTimeSharding {
return noOp(expr, m.shards.Resolver())
}

potentialConflict := syntax.ReducesLabels(expr)
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
return m.mapSampleExpr(expr, r)
Expand Down Expand Up @@ -499,6 +516,10 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
downstreams: downstreams,
}, bytesPerShard, nil
case syntax.OpRangeTypeLast:
if !m.lastOverTimeSharding {
return noOp(expr, m.shards.Resolver())
}

potentialConflict := syntax.ReducesLabels(expr)
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
return m.mapSampleExpr(expr, r)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/queryrangebase/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

cfg.ShardAggregations = []string{}
f.Var(&cfg.ShardAggregations, "querier.shard-aggregations",
"A comma-separated list of LogQL vector and range aggregations that should be sharded")
"A comma-separated list of LogQL vector and range aggregations that should be sharded. Possible values 'quantile_over_time', 'last_over_time', 'first_over_time'.")

cfg.ResultsCacheConfig.RegisterFlags(f)
}
Expand Down
Loading