Skip to content

Commit cfef679

Browse files
committed
feat: ability to log stream selectors before service name detection (#14154)
(cherry picked from commit d7ff426)
1 parent 4e1d2ea commit cfef679

File tree

7 files changed

+89
-18
lines changed

7 files changed

+89
-18
lines changed

clients/pkg/promtail/targets/lokipush/pushtarget.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
111111
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
112112
logger := util_log.WithContext(r.Context(), util_log.Logger)
113113
userID, _ := tenant.TenantID(r.Context())
114-
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil)
114+
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil, false)
115115
if err != nil {
116116
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
117117
http.Error(w, err.Error(), http.StatusBadRequest)

pkg/distributor/http.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
5858
pushRequestParser = d.RequestParserWrapper(pushRequestParser)
5959
}
6060

61-
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker)
61+
logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID)
62+
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, logPushRequestStreams)
6263
if err != nil {
6364
if d.tenantConfigs.LogPushRequest(tenantID) {
6465
level.Debug(logger).Log(
@@ -73,7 +74,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
7374
return
7475
}
7576

76-
if d.tenantConfigs.LogPushRequestStreams(tenantID) {
77+
if logPushRequestStreams {
7778
var sb strings.Builder
7879
for _, s := range req.Streams {
7980
sb.WriteString(s.Labels)

pkg/distributor/http_test.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/http/httptest"
88
"testing"
99

10+
"github.com/go-kit/log"
1011
"github.com/grafana/dskit/user"
1112

1213
"github.com/grafana/loki/v3/pkg/loghttp/push"
@@ -114,6 +115,14 @@ func Test_OtelErrorHeaderInterceptor(t *testing.T) {
114115
}
115116
}
116117

117-
func stubParser(_ string, _ *http.Request, _ push.TenantsRetention, _ push.Limits, _ push.UsageTracker) (*logproto.PushRequest, *push.Stats, error) {
118+
func stubParser(
119+
_ string,
120+
_ *http.Request,
121+
_ push.TenantsRetention,
122+
_ push.Limits,
123+
_ push.UsageTracker,
124+
_ bool,
125+
_ log.Logger,
126+
) (*logproto.PushRequest, *push.Stats, error) {
118127
return &logproto.PushRequest{}, &push.Stats{}, nil
119128
}

pkg/loghttp/push/otlp.go

+30-3
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import (
88
"io"
99
"net/http"
1010
"sort"
11+
"strings"
1112
"time"
1213

14+
"github.com/go-kit/log"
15+
"github.com/go-kit/log/level"
1316
"github.com/pkg/errors"
1417
"github.com/prometheus/common/model"
1518
"github.com/prometheus/prometheus/model/labels"
@@ -40,14 +43,14 @@ func newPushStats() *Stats {
4043
}
4144
}
4245

43-
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
46+
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
4447
stats := newPushStats()
4548
otlpLogs, err := extractLogs(r, stats)
4649
if err != nil {
4750
return nil, nil, err
4851
}
4952

50-
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats)
53+
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger)
5154
return req, stats, nil
5255
}
5356

@@ -98,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
98101
return req.Logs(), nil
99102
}
100103

101-
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
104+
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger) *logproto.PushRequest {
102105
if ld.LogRecordCount() == 0 {
103106
return &logproto.PushRequest{}
104107
}
@@ -113,6 +116,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
113116

114117
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
115118
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size
119+
var pushedLabels model.LabelSet
120+
if logPushRequestStreams {
121+
pushedLabels = make(model.LabelSet, 30)
122+
}
116123

117124
shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric
118125
hasServiceName := false
@@ -129,6 +136,9 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
129136
if action == IndexLabel {
130137
for _, lbl := range attributeAsLabels {
131138
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
139+
if logPushRequestStreams && pushedLabels != nil {
140+
pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
141+
}
132142

133143
if !hasServiceName && shouldDiscoverServiceName {
134144
for _, labelName := range discoverServiceName {
@@ -151,6 +161,23 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
151161
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown)
152162
}
153163

164+
if logPushRequestStreams {
165+
var sb strings.Builder
166+
sb.WriteString("{")
167+
labels := make([]string, 0, len(pushedLabels))
168+
for name, value := range pushedLabels {
169+
labels = append(labels, fmt.Sprintf(`%s="%s"`, name, value))
170+
}
171+
sb.WriteString(strings.Join(labels, ", "))
172+
sb.WriteString("}")
173+
174+
level.Debug(logger).Log(
175+
"msg", "OTLP push request stream before service name discovery",
176+
"stream", sb.String(),
177+
"service_name", streamLabels[model.LabelName(LabelServiceName)],
178+
)
179+
}
180+
154181
if err := streamLabels.Validate(); err != nil {
155182
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
156183
continue

pkg/loghttp/push/otlp_test.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88
"time"
99

10+
"github.com/go-kit/log"
1011
"github.com/prometheus/prometheus/model/labels"
1112
"github.com/prometheus/prometheus/model/relabel"
1213
"github.com/stretchr/testify/require"
@@ -508,7 +509,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
508509
t.Run(tc.name, func(t *testing.T) {
509510
stats := newPushStats()
510511
tracker := NewMockTracker()
511-
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats)
512+
pushReq := otlpToLokiPushRequest(
513+
context.Background(),
514+
tc.generateLogs(),
515+
"foo",
516+
fakeRetention{},
517+
tc.otlpConfig,
518+
defaultServiceDetection,
519+
tracker,
520+
stats,
521+
false,
522+
log.NewNopLogger(),
523+
)
512524
require.Equal(t, tc.expectedPushRequest, *pushReq)
513525
require.Equal(t, tc.expectedStats, *stats)
514526

pkg/loghttp/push/push.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
8484
}
8585

8686
type (
87-
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error)
87+
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
8888
RequestParserWrapper func(inner RequestParser) RequestParser
8989
)
9090

@@ -106,8 +106,8 @@ type Stats struct {
106106
IsAggregatedMetric bool
107107
}
108108

109-
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker) (*logproto.PushRequest, error) {
110-
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker)
109+
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) {
110+
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger)
111111
if err != nil {
112112
return nil, err
113113
}
@@ -164,7 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
164164
return req, nil
165165
}
166166

167-
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
167+
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
168168
// Body
169169
var body io.Reader
170170
// bodySize should always reflect the compressed size of the request body
@@ -247,8 +247,13 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
247247
pushStats.IsAggregatedMetric = true
248248
}
249249

250+
var beforeServiceName string
251+
if logPushRequestStreams {
252+
beforeServiceName = lbs.String()
253+
}
254+
255+
serviceName := ServiceUnknown
250256
if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 && !pushStats.IsAggregatedMetric {
251-
serviceName := ServiceUnknown
252257
for _, labelName := range discoverServiceName {
253258
if labelVal := lbs.Get(labelName); labelVal != "" {
254259
serviceName = labelVal
@@ -264,6 +269,14 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
264269
lbs = lb.Del(LabelServiceName).Labels()
265270
}
266271

272+
if logPushRequestStreams {
273+
level.Debug(logger).Log(
274+
"msg", "push request stream before service name discovery",
275+
"labels", beforeServiceName,
276+
"service_name", serviceName,
277+
)
278+
}
279+
267280
var retentionPeriod time.Duration
268281
if tenantsRetention != nil {
269282
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs)

pkg/loghttp/push/push_test.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,16 @@ func TestParseRequest(t *testing.T) {
262262
}
263263

264264
tracker := NewMockTracker()
265-
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker)
265+
data, err := ParseRequest(
266+
util_log.Logger,
267+
"fake",
268+
request,
269+
nil,
270+
&fakeLimits{enabled: test.enableServiceDiscovery},
271+
ParseLokiRequest,
272+
tracker,
273+
false,
274+
)
266275

267276
structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
268277
previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived
@@ -355,7 +364,7 @@ func Test_ServiceDetection(t *testing.T) {
355364
request := createRequest("/loki/api/v1/push", strings.NewReader(body))
356365

357366
limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
358-
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker)
367+
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker, false)
359368

360369
require.NoError(t, err)
361370
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
@@ -366,7 +375,7 @@ func Test_ServiceDetection(t *testing.T) {
366375
request := createRequest("/otlp/v1/push", bytes.NewReader(body))
367376

368377
limits := &fakeLimits{enabled: true}
369-
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
378+
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
370379
require.NoError(t, err)
371380
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
372381
})
@@ -380,7 +389,7 @@ func Test_ServiceDetection(t *testing.T) {
380389
labels: []string{"special"},
381390
indexAttributes: []string{"special"},
382391
}
383-
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
392+
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
384393
require.NoError(t, err)
385394
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
386395
})
@@ -394,7 +403,7 @@ func Test_ServiceDetection(t *testing.T) {
394403
labels: []string{"special"},
395404
indexAttributes: []string{},
396405
}
397-
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
406+
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
398407
require.NoError(t, err)
399408
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
400409
})

0 commit comments

Comments
 (0)