Skip to content

Commit

Permalink
Remove operation and service labels from Nexus schedule to start metr…
Browse files Browse the repository at this point in the history
…ic (#1830)

The labels are not relevant when polling from the Nexus task queue where any request (task) may be served.
  • Loading branch information
bergundy authored Feb 18, 2025
1 parent 623badd commit 8f38795
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
14 changes: 8 additions & 6 deletions internal/internal_nexus_task_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error {
return nil
}

executionStartTime := time.Now()

// Schedule-to-start (from the time the request hit the frontend).
// Note that this metric does not include the service and operation name as they are not relevant when polling from
// the Nexus task queue.
scheduleToStartLatency := executionStartTime.Sub(response.GetRequest().GetScheduledTime().AsTime())
ntp.metricsHandler.WithTags(metrics.TaskQueueTags(ntp.taskQueueName)).Timer(metrics.NexusTaskScheduleToStartLatency).Record(scheduleToStartLatency)

nctx, handlerErr := ntp.taskHandler.newNexusOperationContext(response)
if handlerErr != nil {
// context wasn't propagated to us, use a background context.
Expand All @@ -143,12 +151,6 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error {
return err
}

executionStartTime := time.Now()

// Schedule-to-start (from the time the request hit the frontend).
scheduleToStartLatency := executionStartTime.Sub(response.GetRequest().GetScheduledTime().AsTime())
nctx.MetricsHandler.Timer(metrics.NexusTaskScheduleToStartLatency).Record(scheduleToStartLatency)

// Process the nexus task.
res, failure, err := ntp.taskHandler.ExecuteContext(nctx, response)

Expand Down
25 changes: 16 additions & 9 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,16 @@ func (tc *testContext) newNexusClient(t *testing.T, service string) *nexus.HTTPC
return nc
}

func (tc *testContext) requireTaskQueueTimer(t *assert.CollectT, metric string) {
assert.True(t, slices.ContainsFunc(tc.metricsHandler.Timers(), func(ct *metrics.CapturedTimer) bool {
return ct.Name == metric && ct.Tags[metrics.TaskQueueTagName] == tc.taskQueue
}))
}

func (tc *testContext) requireTimer(t *assert.CollectT, metric, service, operation string) {
assert.True(t, slices.ContainsFunc(tc.metricsHandler.Timers(), func(ct *metrics.CapturedTimer) bool {
return ct.Name == metric &&
ct.Tags[metrics.TaskQueueTagName] == tc.taskQueue &&
ct.Tags[metrics.NexusServiceTagName] == service &&
ct.Tags[metrics.NexusOperationTagName] == operation
}))
Expand Down Expand Up @@ -245,8 +252,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Equal(t, "ok", result)

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
}, time.Second*3, time.Millisecond*100)
})
Expand All @@ -260,8 +267,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Equal(t, "fail", opErr.Cause.Error())

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "operation_failed")
}, time.Second*3, time.Millisecond*100)
Expand All @@ -276,8 +283,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Contains(t, handlerErr.Cause.Error(), "arbitrary error message")

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "handler_error_INTERNAL")
}, time.Second*3, time.Millisecond*100)
Expand All @@ -291,8 +298,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Contains(t, handlerErr.Cause.Error(), "handlererror")

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "handler_error_BAD_REQUEST")
}, time.Second*3, time.Millisecond*100)
Expand All @@ -306,8 +313,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Contains(t, handlerErr.Cause.Error(), "faking workflow already started")

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "handler_error_BAD_REQUEST")
}, time.Second*3, time.Millisecond*100)
Expand All @@ -321,8 +328,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Contains(t, handlerErr.Cause.Error(), "fake app error for test")

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "handler_error_INTERNAL")
}, time.Second*3, time.Millisecond*100)
Expand All @@ -336,8 +343,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Contains(t, handlerErr.Cause.Error(), "fake app error for test")

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "handler_error_BAD_REQUEST")
}, time.Second*3, time.Millisecond*100)
Expand All @@ -351,8 +358,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Contains(t, handlerErr.Cause.Error(), "panic: panic requested")

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
tc.requireTimer(t, metrics.NexusTaskEndToEndLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "handler_error_INTERNAL")
}, time.Second*3, time.Millisecond*100)
Expand All @@ -368,8 +375,8 @@ func TestNexusSyncOperation(t *testing.T) {
require.Equal(t, nexus.HandlerErrorTypeUpstreamTimeout, handlerErr.Type)

require.EventuallyWithT(t, func(t *assert.CollectT) {
tc.requireTaskQueueTimer(t, metrics.NexusTaskScheduleToStartLatency)
// NOTE metrics.NexusTaskEndToEndLatency isn't recorded on timeouts.
tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name())
tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name())
tc.requireFailureCounter(t, service.Name, syncOp.Name(), "timeout")
}, time.Second*3, time.Millisecond*100)
Expand Down

0 comments on commit 8f38795

Please sign in to comment.