Skip to content

Commit c88b9f4

Browse files
committed
simplify+pointer for shard chunks
Signed-off-by: Owen Diehl <[email protected]>
1 parent 4639cfd commit c88b9f4

File tree

3 files changed

+37
-60
lines changed

3 files changed

+37
-60
lines changed

pkg/logql/downstream.go

+15-58
Original file line numberDiff line numberDiff line change
@@ -394,22 +394,11 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
394394

395395
case DownstreamSampleExpr:
396396
// downstream to a querier
397-
var shards Shards
398-
if e.shard != nil {
399-
shards = append(shards, e.shard.Shard)
400-
params = ParamsWithChunkOverrides{
401-
Params: params,
402-
StoreChunksOverride: &e.shard.chunks,
403-
}
404-
}
405397
acc := NewBufferedAccumulator(1)
406398
results, err := ev.Downstream(ctx, []DownstreamQuery{{
407-
Params: ParamsWithShardsOverride{
408-
Params: ParamsWithExpressionOverride{
409-
Params: params,
410-
ExpressionOverride: e.SampleExpr,
411-
},
412-
ShardsOverride: shards.Encode(),
399+
Params: ParamsWithExpressionOverride{
400+
Params: ParamOverridesFromShard(params, e.shard),
401+
ExpressionOverride: e.SampleExpr,
413402
},
414403
}}, acc)
415404
if err != nil {
@@ -422,16 +411,10 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
422411
var queries []DownstreamQuery
423412
for cur != nil {
424413
qry := DownstreamQuery{
425-
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr},
426-
}
427-
if shard := cur.DownstreamSampleExpr.shard; shard != nil {
428-
qry.Params = ParamsWithShardsOverride{
429-
Params: ParamsWithChunkOverrides{
430-
Params: qry.Params,
431-
StoreChunksOverride: &shard.chunks,
432-
},
433-
ShardsOverride: Shards{shard.Shard}.Encode(),
434-
}
414+
Params: ParamsWithExpressionOverride{
415+
Params: ParamOverridesFromShard(params, cur.DownstreamSampleExpr.shard),
416+
ExpressionOverride: cur.DownstreamSampleExpr.SampleExpr,
417+
},
435418
}
436419
queries = append(queries, qry)
437420
cur = cur.next
@@ -464,19 +447,10 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
464447
for _, d := range e.quantileMergeExpr.downstreams {
465448
qry := DownstreamQuery{
466449
Params: ParamsWithExpressionOverride{
467-
Params: params,
450+
Params: ParamOverridesFromShard(params, d.shard),
468451
ExpressionOverride: d.SampleExpr,
469452
},
470453
}
471-
if shard := d.shard; shard != nil {
472-
qry.Params = ParamsWithShardsOverride{
473-
Params: ParamsWithChunkOverrides{
474-
Params: qry.Params,
475-
StoreChunksOverride: &shard.chunks,
476-
},
477-
ShardsOverride: Shards{shard.Shard}.Encode(),
478-
}
479-
}
480454
queries = append(queries, qry)
481455
}
482456
}
@@ -512,22 +486,11 @@ func (ev *DownstreamEvaluator) NewIterator(
512486
switch e := expr.(type) {
513487
case DownstreamLogSelectorExpr:
514488
// downstream to a querier
515-
var shards Shards
516-
if e.shard != nil {
517-
shards = append(shards, e.shard.Shard)
518-
params = ParamsWithChunkOverrides{
519-
Params: params,
520-
StoreChunksOverride: &e.shard.chunks,
521-
}
522-
}
523489
acc := NewStreamAccumulator(params)
524490
results, err := ev.Downstream(ctx, []DownstreamQuery{{
525-
Params: ParamsWithShardsOverride{
526-
Params: ParamsWithExpressionOverride{
527-
Params: params,
528-
ExpressionOverride: e.LogSelectorExpr,
529-
},
530-
ShardsOverride: shards.Encode(),
491+
Params: ParamsWithExpressionOverride{
492+
Params: ParamOverridesFromShard(params, e.shard),
493+
ExpressionOverride: e.LogSelectorExpr,
531494
},
532495
}}, acc)
533496
if err != nil {
@@ -540,16 +503,10 @@ func (ev *DownstreamEvaluator) NewIterator(
540503
var queries []DownstreamQuery
541504
for cur != nil {
542505
qry := DownstreamQuery{
543-
Params: ParamsWithExpressionOverride{Params: params, ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr},
544-
}
545-
if shard := cur.DownstreamLogSelectorExpr.shard; shard != nil {
546-
qry.Params = ParamsWithShardsOverride{
547-
Params: ParamsWithChunkOverrides{
548-
Params: qry.Params,
549-
StoreChunksOverride: &shard.chunks,
550-
},
551-
ShardsOverride: Shards{shard.Shard}.Encode(),
552-
}
506+
Params: ParamsWithExpressionOverride{
507+
Params: ParamOverridesFromShard(params, cur.DownstreamLogSelectorExpr.shard),
508+
ExpressionOverride: cur.DownstreamLogSelectorExpr.LogSelectorExpr,
509+
},
553510
}
554511
queries = append(queries, qry)
555512
cur = cur.next

pkg/logql/evaluator.go

+20
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,26 @@ func (p ParamsWithChunkOverrides) GetStoreChunks() *logproto.ChunkRefGroup {
157157
return p.StoreChunksOverride
158158
}
159159

160+
func ParamOverridesFromShard(base Params, shard *ShardWithChunkRefs) (result Params) {
161+
if shard == nil {
162+
return base
163+
}
164+
165+
result = ParamsWithShardsOverride{
166+
Params: base,
167+
ShardsOverride: Shards{shard.Shard}.Encode(),
168+
}
169+
170+
if shard.chunks != nil {
171+
result = ParamsWithChunkOverrides{
172+
Params: result,
173+
StoreChunksOverride: shard.chunks,
174+
}
175+
}
176+
177+
return result
178+
}
179+
160180
// Sortable logql contain sort or sort_desc.
161181
func Sortable(q Params) (bool, error) {
162182
var sortable bool

pkg/logql/shards.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) ([]ShardWithChunkRefs, uint
163163
// and are used to precompute chunk refs for each group
164164
type ShardWithChunkRefs struct {
165165
Shard
166-
chunks logproto.ChunkRefGroup
166+
chunks *logproto.ChunkRefGroup
167167
}
168168

169169
// Shard represents a shard annotation
@@ -208,7 +208,7 @@ func (s Shard) Bind(chunks *logproto.ChunkRefGroup) *ShardWithChunkRefs {
208208
Shard: s,
209209
}
210210
if chunks != nil {
211-
res.chunks = *chunks
211+
res.chunks = chunks
212212
}
213213
return res
214214
}

0 commit comments

Comments
 (0)