Skip to content

Commit 5039b1d

Browse files
committed
Reduce HLL memory usage
1 parent 448aafe commit 5039b1d

File tree

5 files changed

+13
-72
lines changed

5 files changed

+13
-72
lines changed

pkg/logcli/query/query_test.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ import (
55
"context"
66
"errors"
77
"fmt"
8-
"iter"
98
"os"
109
"path/filepath"
11-
"slices"
1210
"strings"
1311
"testing"
1412
"time"
@@ -413,7 +411,7 @@ type testQueryClient struct {
413411
}
414412

415413
func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
416-
q := logql.NewMockQuerier(0, func() iter.Seq2[int, logproto.Stream] { return slices.All(testStreams)})
414+
q := logql.NewMockQuerier(0, testStreams)
417415
e := logql.NewEngine(logql.EngineOpts{}, q, logql.NoLimits, log.NewNopLogger())
418416
return &testQueryClient{
419417
engine: e,

pkg/logql/downstream_test.go

+7-9
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package logql
33
import (
44
"context"
55
"fmt"
6-
"iter"
76
"math"
8-
"slices"
97
"testing"
108
"time"
119

@@ -87,7 +85,7 @@ func TestMappingEquivalence(t *testing.T) {
8785
} {
8886
q := NewMockQuerier(
8987
shards,
90-
func() iter.Seq2[int, logproto.Stream] { return slices.All(streams) },
88+
streams,
9189
)
9290

9391
opts := EngineOpts{}
@@ -195,7 +193,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
195193
} {
196194
q := NewMockQuerier(
197195
shards,
198-
func() iter.Seq2[int, logproto.Stream] { return slices.All(streams) },
196+
streams,
199197
)
200198

201199
opts := EngineOpts{
@@ -296,7 +294,7 @@ func TestApproxTopkSketches(t *testing.T) {
296294
shardedQuery string
297295
regularQuery string
298296
realtiveError float64
299-
// cardinalityEstimate int
297+
//cardinalityEstimate int
300298
}{
301299
// Note:our data generation results in less spread between topk things for 10k streams than for 100k streams
302300
// if we have 1k streams, we can get much more accurate results for topk 10 than topk 100
@@ -306,7 +304,7 @@ func TestApproxTopkSketches(t *testing.T) {
306304
shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
307305
regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
308306
realtiveError: 0.0012,
309-
// cardinalityEstimate: 3,
307+
//cardinalityEstimate: 3,
310308
},
311309
{
312310
labelShards: 10,
@@ -349,7 +347,7 @@ func TestApproxTopkSketches(t *testing.T) {
349347

350348
q := NewMockQuerier(
351349
tc.labelShards,
352-
func() iter.Seq2[int, logproto.Stream] { return slices.All(streams) },
350+
streams,
353351
)
354352

355353
opts := EngineOpts{
@@ -429,7 +427,7 @@ func TestShardCounter(t *testing.T) {
429427
} {
430428
q := NewMockQuerier(
431429
shards,
432-
func() iter.Seq2[int, logproto.Stream] { return slices.All(streams) },
430+
streams,
433431
)
434432

435433
opts := EngineOpts{}
@@ -688,7 +686,7 @@ func TestRangeMappingEquivalence(t *testing.T) {
688686
} {
689687
q := NewMockQuerier(
690688
shards,
691-
func() iter.Seq2[int, logproto.Stream] { return slices.All(streams) },
689+
streams,
692690
)
693691

694692
opts := EngineOpts{}

pkg/logql/engine_test.go

-50
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8-
goiter "iter"
98
"math"
10-
"math/rand"
119
"strings"
1210
"testing"
1311
"time"
@@ -2512,54 +2510,6 @@ func BenchmarkRangeQuery1000000(b *testing.B) {
25122510
benchmarkRangeQuery(int64(1000000), b)
25132511
}
25142512

2515-
func BenchmarkHighCardinalityInstantQuery(b *testing.B) {
2516-
b.ReportAllocs()
2517-
2518-
mockQuerier := NewMockQuerier(1, func() goiter.Seq2[int, logproto.Stream] { return generateLogs(1_000_000, 100_000) })
2519-
2520-
eng := NewEngine(EngineOpts{}, mockQuerier, NoLimits, log.NewNopLogger())
2521-
start := time.Unix(1, 0)
2522-
b.ResetTimer()
2523-
2524-
for i := 0; i < b.N; i++ {
2525-
params, err := NewLiteralParams(`sum by(cardinality) (sum_over_time({app="foo"} | logfmt | unwrap value [1s]))`, start, start, 0, 0, logproto.BACKWARD, 1000, nil, nil)
2526-
require.NoError(b, err)
2527-
q := eng.Query(params)
2528-
2529-
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
2530-
if err != nil {
2531-
b.Fatal(err)
2532-
}
2533-
result = res.Data
2534-
if result == nil {
2535-
b.Fatal("unexpected nil result")
2536-
}
2537-
}
2538-
}
2539-
2540-
func generateLogs(lines int, cardinality int) goiter.Seq2[int, logproto.Stream] {
2541-
batchSize := 10
2542-
return func(yield func(int, logproto.Stream) bool) {
2543-
for batch := range lines / batchSize {
2544-
entries := make([]logproto.Entry, batchSize)
2545-
for i := 0; i < batchSize; i++ {
2546-
line := batch*batchSize + i
2547-
entries[i] = logproto.Entry{
2548-
Timestamp: time.Unix(0, int64(batch)),
2549-
Line: fmt.Sprintf("cardinality=%d batch=%d line=%d value=%f", i%cardinality, batch, line, rand.Float64()),
2550-
}
2551-
}
2552-
stream := logproto.Stream{
2553-
Entries: entries,
2554-
Labels: `{app="foo"}`,
2555-
}
2556-
if !yield(batch, stream) {
2557-
return
2558-
}
2559-
}
2560-
}
2561-
}
2562-
25632513
var result promql_parser.Value
25642514

25652515
func benchmarkRangeQuery(testsize int64, b *testing.B) {

pkg/logql/explain_test.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package logql
22

33
import (
44
"context"
5-
"iter"
6-
"slices"
75
"testing"
86
"time"
97

@@ -12,7 +10,6 @@ import (
1210

1311
"github.com/grafana/dskit/user"
1412

15-
"github.com/grafana/loki/v3/pkg/logproto"
1613
"github.com/grafana/loki/v3/pkg/logql/syntax"
1714
)
1815

@@ -21,7 +18,7 @@ func TestExplain(t *testing.T) {
2118

2219
// TODO(karsten): Ideally the querier and downstreamer are not required
2320
// to create the step evaluators.
24-
querier := NewMockQuerier(4, func() iter.Seq2[int, logproto.Stream] { return slices.All(make([]logproto.Stream, 0)) })
21+
querier := NewMockQuerier(4, nil)
2522
opts := EngineOpts{}
2623
regular := NewEngine(opts, querier, NoLimits, log.NewNopLogger())
2724

pkg/logql/test_utils.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package logql
33
import (
44
"context"
55
"fmt"
6-
goiter "iter"
76
logger "log"
87
"math/rand"
98
"sort"
@@ -26,7 +25,7 @@ import (
2625

2726
const ConCurrency = 100
2827

29-
func NewMockQuerier(shards int, streams func() goiter.Seq2[int, logproto.Stream]) MockQuerier {
28+
func NewMockQuerier(shards int, streams []logproto.Stream) MockQuerier {
3029
return MockQuerier{
3130
shards: shards,
3231
streams: streams,
@@ -36,7 +35,7 @@ func NewMockQuerier(shards int, streams func() goiter.Seq2[int, logproto.Stream]
3635
// Shard aware mock querier
3736
type MockQuerier struct {
3837
shards int
39-
streams func() goiter.Seq2[int, logproto.Stream]
38+
streams []logproto.Stream
4039
}
4140

4241
func (q MockQuerier) extractOldShard(xs []string) (*index.ShardAnnotation, error) {
@@ -75,7 +74,7 @@ func (q MockQuerier) SelectLogs(_ context.Context, req SelectLogParams) (iter.En
7574
var matched []logproto.Stream
7675

7776
outer:
78-
for _, stream := range q.streams() {
77+
for _, stream := range q.streams {
7978
ls := mustParseLabels(stream.Labels)
8079

8180
// filter by shard if requested
@@ -113,7 +112,6 @@ outer:
113112
return iter.NewSortEntryIterator(streamIters, req.Direction), nil
114113
}
115114

116-
// TODO: check if this is the same as in storage
117115
func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Stream {
118116
resByStream := map[string]*logproto.Stream{}
119117

@@ -203,7 +201,7 @@ func (q MockQuerier) SelectSamples(_ context.Context, req SelectSampleParams) (i
203201
var matched []logproto.Stream
204202

205203
outer:
206-
for _, stream := range q.streams() {
204+
for _, stream := range q.streams {
207205
ls := mustParseLabels(stream.Labels)
208206

209207
// filter by shard if requested

0 commit comments

Comments
 (0)