Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pyroscope.receive_http): ensure consistent service_name label handling #2675

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Main (unreleased)
- Reduce CPU usage of `loki.source.windowsevent` by up to 85% by updating the bookmark file every 10 seconds instead of after every event and by
optimizing the retrieval of the process name. (@wildum)

- Ensure consistent service_name label handling in `pyroscope.receive_http` to match Pyroscope's behavior. (@marcsanmi)

### Bugfixes

- Fix log rotation for Windows in `loki.source.file` by refactoring the component to use the runner pkg. This should also reduce CPU consumption when tailing a lot of files in a dynamic environment. (@wildum)
Expand Down
4 changes: 3 additions & 1 deletion internal/component/pyroscope/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
)

const (
LabelNameDelta = "__delta__"
LabelNameDelta = "__delta__"
LabelName = "__name__"
LabelServiceName = "service_name"
)

var NoopAppendable = AppendableFunc(func(_ context.Context, _ labels.Labels, _ []*RawSample) error { return nil })
Expand Down
21 changes: 20 additions & 1 deletion internal/component/pyroscope/receive_http/receive_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ func (c *Component) Push(ctx context.Context, req *connect.Request[pushv1.PushRe
for idx := range req.Msg.Series {
lb.Reset(nil)
setLabelBuilderFromAPI(lb, req.Msg.Series[idx].Labels)
err := appendable.Append(ctx, lb.Labels(), apiToAlloySamples(req.Msg.Series[idx].Samples))
// Ensure service_name label is set
lbls := ensureServiceName(lb.Labels())
err := appendable.Append(ctx, lbls, apiToAlloySamples(req.Msg.Series[idx].Samples))
if err != nil {
errs = errors.Join(
errs,
Expand Down Expand Up @@ -240,6 +242,9 @@ func (c *Component) handleIngest(w http.ResponseWriter, r *http.Request) {
}
}

// Ensure service_name label is set
lbls = ensureServiceName(lbls)

// Read the entire body into memory
// This matches how Append() handles profile data (as RawProfile),
// but means the entire profile will be held in memory
Expand Down Expand Up @@ -292,3 +297,17 @@ func (c *Component) shutdownServer() {
c.server = nil
}
}

// rename __name__ to service_name
func ensureServiceName(lbls labels.Labels) labels.Labels {
builder := labels.NewBuilder(lbls)
originalName := lbls.Get(pyroscope.LabelName)

if !lbls.Has(pyroscope.LabelServiceName) {
builder.Set(pyroscope.LabelServiceName, originalName)
} else {
builder.Set("app_name", originalName)
}

return builder.Labels()
}
56 changes: 46 additions & 10 deletions internal/component/pyroscope/receive_http/receive_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func TestForwardsProfilesIngest(t *testing.T) {
appendableErrors: []error{nil, nil},
expectedStatus: http.StatusOK,
expectedForwards: 2,
expectedLabels: map[string]string{
"__name__": "test_app",
"service_name": "test_app",
},
},
{
name: "Large profile with custom headers",
Expand All @@ -67,6 +71,10 @@ func TestForwardsProfilesIngest(t *testing.T) {
appendableErrors: []error{nil},
expectedStatus: http.StatusOK,
expectedForwards: 1,
expectedLabels: map[string]string{
"__name__": "test_app",
"service_name": "test_app",
},
},
{
name: "Invalid method",
Expand Down Expand Up @@ -111,6 +119,10 @@ func TestForwardsProfilesIngest(t *testing.T) {
appendableErrors: []error{fmt.Errorf("error1"), fmt.Errorf("error2")},
expectedStatus: http.StatusInternalServerError,
expectedForwards: 2,
expectedLabels: map[string]string{
"__name__": "test_app",
"service_name": "test_app",
},
},
{
name: "One appendable fails, one succeeds",
Expand All @@ -122,6 +134,10 @@ func TestForwardsProfilesIngest(t *testing.T) {
appendableErrors: []error{fmt.Errorf("error"), nil},
expectedStatus: http.StatusInternalServerError,
expectedForwards: 2,
expectedLabels: map[string]string{
"__name__": "test_app",
"service_name": "test_app",
},
},
{
name: "Valid labels are parsed and forwarded",
Expand All @@ -134,9 +150,10 @@ func TestForwardsProfilesIngest(t *testing.T) {
expectedStatus: http.StatusOK,
expectedForwards: 2,
expectedLabels: map[string]string{
"__name__": "test.app",
"env": "prod",
"region": "us-east",
"__name__": "test.app",
"service_name": "test.app",
"env": "prod",
"region": "us-east",
},
},
{
Expand All @@ -150,7 +167,24 @@ func TestForwardsProfilesIngest(t *testing.T) {
expectedStatus: http.StatusOK,
expectedForwards: 2,
expectedLabels: map[string]string{
"__name__": "test.app", // Only __name__ is preserved
"__name__": "test.app",
"service_name": "test.app",
},
},
{
name: "existing service_name sets app_name from __name__",
profileSize: 1024,
method: "POST",
path: "/ingest",
queryParams: "name=test.app{service_name=my-service}",
headers: map[string]string{"Content-Type": "application/octet-stream"},
appendableErrors: []error{nil},
expectedStatus: http.StatusOK,
expectedForwards: 1,
expectedLabels: map[string]string{
"__name__": "test.app",
"service_name": "my-service",
"app_name": "test.app",
},
},
}
Expand Down Expand Up @@ -333,12 +367,14 @@ func verifyForwardedProfiles(
testApp, ok := app.(*testAppender)
require.True(t, ok, "Appendable is not a testAppender")

// Verify labels if name parameter exists and is valid
if nameParam := testApp.lastProfile.URL.Query().Get("name"); nameParam != "" {
ls, err := labelset.Parse(nameParam)
if err == nil {
require.Equal(t, ls.Labels(), testApp.lastProfile.Labels.Map(),
"Labels mismatch for appendable %d", i)
// Skip name parameter label check if we're testing service_name behavior
if expectedLabels == nil || expectedLabels["service_name"] == "" {
if nameParam := testApp.lastProfile.URL.Query().Get("name"); nameParam != "" {
ls, err := labelset.Parse(nameParam)
if err == nil {
require.Equal(t, ls.Labels(), testApp.lastProfile.Labels.Map(),
"Labels mismatch for appendable %d", i)
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion internal/component/pyroscope/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco
if !profile.Labels.IsEmpty() {
ls := labelset.New(make(map[string]string))

profile.Labels.Range(func(l labels.Label) {
finalLabels := ensureNameMatchesService(profile.Labels)
finalLabels.Range(func(l labels.Label) {
ls.Add(l.Name, l.Value)
})

Expand Down Expand Up @@ -456,3 +457,12 @@ func (i *agentInterceptor) WrapStreamingClient(next connect.StreamingClientFunc)
func (i *agentInterceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return next
}

func ensureNameMatchesService(lbls labels.Labels) labels.Labels {
if serviceName := lbls.Get(pyroscope.LabelServiceName); serviceName != "" {
builder := labels.NewBuilder(lbls)
builder.Set(pyroscope.LabelName, serviceName)
return builder.Labels()
}
return lbls
}
62 changes: 62 additions & 0 deletions internal/component/pyroscope/write/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,65 @@ func Test_Write_AppendIngest(t *testing.T) {
require.NoError(t, err)
require.Equal(t, serverCount, appendCount.Load())
}

func TestAppendIngestLabelTransformation(t *testing.T) {
var (
export Exports
appendCount = atomic.NewInt32(0)
)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
appendCount.Inc()

// Parse labels from query
ls, err := labelset.Parse(r.URL.Query().Get("name"))
require.NoError(t, err)
labels := ls.Labels()

// Verify __name__ matches service_name after transformation
require.Equal(t, "my-service-grafana", labels["__name__"])
require.Equal(t, "my-service-grafana", labels["service_name"])

w.WriteHeader(http.StatusOK)
}))
defer server.Close()

// Create component with a relabel rule that modifies service_name
argument := DefaultArguments()
argument.Endpoints = []*EndpointOptions{{
URL: server.URL,
RemoteTimeout: GetDefaultEndpointOptions().RemoteTimeout,
}}

var wg sync.WaitGroup
wg.Add(1)
c, err := New(component.Options{
ID: "test-write",
Logger: util.TestAlloyLogger(t),
Registerer: prometheus.NewRegistry(),
OnStateChange: func(e component.Exports) {
defer wg.Done()
export = e.(Exports)
},
}, argument)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go c.Run(ctx)
wg.Wait()
require.NotNil(t, export.Receiver)

// Send profile
incomingProfile := &pyroscope.IncomingProfile{
Labels: labels.FromMap(map[string]string{
"__name__": "original-name",
"service_name": "my-service-grafana",
}),
URL: &url.URL{Path: "/ingest"},
}

err = export.Receiver.Appender().AppendIngest(context.Background(), incomingProfile)
require.NoError(t, err)
require.Equal(t, int32(1), appendCount.Load())
}
Loading