Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ability to log stream selectors before service name detection #14154

Merged
merged 8 commits into from
Oct 2, 2024
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil)
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil, false)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
pushRequestParser = d.RequestParserWrapper(pushRequestParser)
}

req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker)
logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker, logPushRequestStreams)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand All @@ -73,7 +74,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
return
}

if d.tenantConfigs.LogPushRequestStreams(tenantID) {
if logPushRequestStreams {
var sb strings.Builder
for _, s := range req.Streams {
sb.WriteString(s.Labels)
Expand Down
11 changes: 10 additions & 1 deletion pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http/httptest"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"

"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -114,6 +115,14 @@ func Test_OtelErrorHeaderInterceptor(t *testing.T) {
}
}

func stubParser(_ string, _ *http.Request, _ push.TenantsRetention, _ push.Limits, _ push.UsageTracker) (*logproto.PushRequest, *push.Stats, error) {
func stubParser(
_ string,
_ *http.Request,
_ push.TenantsRetention,
_ push.Limits,
_ push.UsageTracker,
_ bool,
_ log.Logger,
) (*logproto.PushRequest, *push.Stats, error) {
return &logproto.PushRequest{}, &push.Stats{}, nil
}
33 changes: 30 additions & 3 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"io"
"net/http"
"sort"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -40,14 +43,14 @@ func newPushStats() *Stats {
}
}

func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
stats := newPushStats()
otlpLogs, err := extractLogs(r, stats)
if err != nil {
return nil, nil, err
}

req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger)
return req, stats, nil
}

Expand Down Expand Up @@ -98,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -113,6 +116,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten

resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size
var pushedLabels model.LabelSet
if logPushRequestStreams {
pushedLabels = make(model.LabelSet, 30)
}

shouldDiscoverServiceName := len(discoverServiceName) > 0 && !stats.IsAggregatedMetric
hasServiceName := false
Expand All @@ -129,6 +136,9 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
if logPushRequestStreams && pushedLabels != nil {
pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

if !hasServiceName && shouldDiscoverServiceName {
for _, labelName := range discoverServiceName {
Expand All @@ -151,6 +161,23 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown)
}

if logPushRequestStreams {
var sb strings.Builder
sb.WriteString("{")
labels := make([]string, 0, len(pushedLabels))
for name, value := range pushedLabels {
labels = append(labels, fmt.Sprintf(`%s="%s"`, name, value))
}
sb.WriteString(strings.Join(labels, ", "))
sb.WriteString("}")

level.Debug(logger).Log(
"msg", "OTLP push request stream before service name discovery",
"stream", sb.String(),
"service_name", streamLabels[model.LabelName(LabelServiceName)],
)
}

if err := streamLabels.Validate(); err != nil {
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
continue
Expand Down
14 changes: 13 additions & 1 deletion pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -508,7 +509,18 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
stats := newPushStats()
tracker := NewMockTracker()
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, defaultServiceDetection, tracker, stats)
pushReq := otlpToLokiPushRequest(
context.Background(),
tc.generateLogs(),
"foo",
fakeRetention{},
tc.otlpConfig,
defaultServiceDetection,
tracker,
stats,
false,
log.NewNopLogger(),
)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

Expand Down
23 changes: 18 additions & 5 deletions pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (EmptyLimits) DiscoverServiceName(string) []string {
}

type (
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error)
RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
)

Expand All @@ -106,8 +106,8 @@ type Stats struct {
IsAggregatedMetric bool
}

func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker)
func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, logPushRequestStreams bool) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker, logPushRequestStreams, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete
return req, nil
}

func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) {
func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
Expand Down Expand Up @@ -247,8 +247,13 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
pushStats.IsAggregatedMetric = true
}

var beforeServiceName string
if logPushRequestStreams {
beforeServiceName = lbs.String()
}

serviceName := ServiceUnknown
if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 && !pushStats.IsAggregatedMetric {
serviceName := ServiceUnknown
for _, labelName := range discoverServiceName {
if labelVal := lbs.Get(labelName); labelVal != "" {
serviceName = labelVal
Expand All @@ -264,6 +269,14 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe
lbs = lb.Del(LabelServiceName).Labels()
}

if logPushRequestStreams {
level.Debug(logger).Log(
"msg", "push request stream before service name discovery",
"labels", beforeServiceName,
"service_name", serviceName,
)
}

var retentionPeriod time.Duration
if tenantsRetention != nil {
retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs)
Expand Down
19 changes: 14 additions & 5 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,16 @@ func TestParseRequest(t *testing.T) {
}

tracker := NewMockTracker()
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker)
data, err := ParseRequest(
util_log.Logger,
"fake",
request,
nil,
&fakeLimits{enabled: test.enableServiceDiscovery},
ParseLokiRequest,
tracker,
false,
)

structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived
Expand Down Expand Up @@ -355,7 +364,7 @@ func Test_ServiceDetection(t *testing.T) {
request := createRequest("/loki/api/v1/push", strings.NewReader(body))

limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker, false)

require.NoError(t, err)
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
Expand All @@ -366,7 +375,7 @@ func Test_ServiceDetection(t *testing.T) {
request := createRequest("/otlp/v1/push", bytes.NewReader(body))

limits := &fakeLimits{enabled: true}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})
Expand All @@ -380,7 +389,7 @@ func Test_ServiceDetection(t *testing.T) {
labels: []string{"special"},
indexAttributes: []string{"special"},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
})
Expand All @@ -394,7 +403,7 @@ func Test_ServiceDetection(t *testing.T) {
labels: []string{"special"},
indexAttributes: []string{},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker, false)
require.NoError(t, err)
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
})
Expand Down
Loading