From 767cc29b127b8570333e7acbb28139fcc60efd0c Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 21 Jan 2025 12:08:05 -0800 Subject: [PATCH] OTEL in task processing (#7103) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changed? Added support for OpenTelemetry (OTEL) to task processing. ## Why? To extend the OTEL coverage and gain insight into task processing. ## How did you test it? Ran Grafana Tempo locally: Screenshot 2025-01-16 at 6 07 51 PM ## Potential risks I don't expect any negative performance impact on task processing because most operations are noops. ## Documentation ## Is hotfix candidate? --- common/persistence/client/fx.go | 5 ++-- common/telemetry/config.go | 4 +-- common/telemetry/grpc.go | 14 ++++++--- common/telemetry/tags.go | 30 +++++++++++++++++++ service/history/archival_queue_factory.go | 2 ++ .../history/archival_queue_factory_test.go | 2 ++ .../archival_queue_task_executor_test.go | 2 ++ .../history/memory_scheduled_queue_factory.go | 5 ++++ service/history/outbound_queue_factory.go | 1 + service/history/queue_factory_base.go | 3 ++ service/history/queues/executable.go | 25 ++++++++++++++++ service/history/queues/executable_factory.go | 5 ++++ service/history/queues/executable_test.go | 2 ++ .../queues/memory_scheduled_queue_test.go | 2 ++ service/history/queues/queue_base_test.go | 2 ++ .../history/queues/queue_scheduled_test.go | 2 ++ service/history/queues/reader_test.go | 2 ++ service/history/queues/slice_test.go | 2 ++ ...speculative_workflow_task_timeout_queue.go | 5 ++++ .../timer_queue_active_task_executor_test.go | 2 ++ service/history/timer_queue_factory.go | 2 ++ .../timer_queue_standby_task_executor_test.go | 2 ++ ...ransfer_queue_active_task_executor_test.go | 2 ++ service/history/transfer_queue_factory.go | 2 ++ ...ansfer_queue_standby_task_executor_test.go | 2 ++ service/history/visibility_queue_factory.go | 2 ++ .../visibility_queue_task_executor_test.go | 2 ++ 27 files changed, 123 insertions(+), 8 deletions(-) create mode 100644 common/telemetry/tags.go diff --git a/common/persistence/client/fx.go b/common/persistence/client/fx.go index 86ba58516da..6bafa651cf6 100644 --- a/common/persistence/client/fx.go +++ b/common/persistence/client/fx.go @@ -205,8 +205,9 @@ func DataStoreFactoryProvider( dataStoreFactory = faultinjection.NewFaultInjectionDatastoreFactory(defaultStoreCfg.FaultInjection, dataStoreFactory) } - if otel.IsEnabled(tracerProvider) { - dataStoreFactory = telemetry.NewTelemetryDataStoreFactory(dataStoreFactory, logger, tracerProvider.Tracer("persistence")) + tracer := tracerProvider.Tracer("persistence") + if otel.IsEnabled(tracer) { + dataStoreFactory = telemetry.NewTelemetryDataStoreFactory(dataStoreFactory, logger, tracer) } return dataStoreFactory diff --git a/common/telemetry/config.go b/common/telemetry/config.go index 3f88bd1adf2..bf2c0a41ded 100644 --- a/common/telemetry/config.go +++ b/common/telemetry/config.go @@ -433,7 +433,7 @@ func DebugMode() bool { return isDebug } -func IsEnabled(tp trace.TracerProvider) bool { - _, isNoop := tp.(noop.TracerProvider) +func IsEnabled(tp trace.Tracer) bool { + _, isNoop := tp.(noop.Tracer) return !isNoop } diff --git a/common/telemetry/grpc.go b/common/telemetry/grpc.go index 6eee7c494cf..69216708169 100644 --- a/common/telemetry/grpc.go +++ b/common/telemetry/grpc.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/rpc/interceptor/logtags" @@ -66,7 +67,7 @@ func NewServerStatsHandler( tmp propagation.TextMapPropagator, logger log.Logger, ) ServerStatsHandler { - if !IsEnabled(tp) { + if !isEnabled(tp) { return nil } @@ -86,7 +87,7 @@ func NewClientStatsHandler( tp trace.TracerProvider, tmp propagation.TextMapPropagator, ) ClientStatsHandler { - if !IsEnabled(tp) { + if !isEnabled(tp) { return nil } @@ -146,9 +147,9 @@ func (c *customServerStatsHandler) HandleRPC(ctx context.Context, stat stats.RPC var k string switch logTag.Key() { case tag.WorkflowIDKey: - k = "temporalWorkflowID" + k = WorkflowIDKey case tag.WorkflowRunIDKey: - k = "temporalRunID" + k = WorkflowRunIDKey default: continue } @@ -186,3 +187,8 @@ func (c *customServerStatsHandler) TagConn(ctx context.Context, info *stats.Conn func (c *customServerStatsHandler) HandleConn(ctx context.Context, stat stats.ConnStats) { c.wrapped.HandleConn(ctx, stat) } + +func isEnabled(tp trace.TracerProvider) bool { + _, isNoop := tp.(noop.TracerProvider) + return !isNoop +} diff --git a/common/telemetry/tags.go b/common/telemetry/tags.go new file mode 100644 index 00000000000..c39c2ea1f3b --- /dev/null +++ b/common/telemetry/tags.go @@ -0,0 +1,30 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package telemetry + +const ( + WorkflowIDKey = "temporalWorkflowID" + WorkflowRunIDKey = "temporalRunID" +) diff --git a/service/history/archival_queue_factory.go b/service/history/archival_queue_factory.go index ead19acd807..ff38cbffd45 100644 --- a/service/history/archival_queue_factory.go +++ b/service/history/archival_queue_factory.go @@ -116,6 +116,7 @@ func newQueueFactoryBase(params ArchivalQueueFactoryParams) QueueFactoryBase { ), int64(params.Config.ArchivalQueueMaxReaderCount()), ), + Tracer: params.TracerProvider.Tracer("queue.archival"), } } @@ -182,6 +183,7 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q shard.GetClusterMetadata(), logger, metricsHandler, + f.Tracer, f.DLQWriter, f.Config.TaskDLQEnabled, f.Config.TaskDLQUnexpectedErrorAttempts, diff --git a/service/history/archival_queue_factory_test.go b/service/history/archival_queue_factory_test.go index ae18966e4ad..7e926a2393e 100644 --- a/service/history/archival_queue_factory_test.go +++ b/service/history/archival_queue_factory_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" @@ -82,6 +83,7 @@ func TestArchivalQueueFactory(t *testing.T) { TimeSource: clock.NewEventTimeSource(), MetricsHandler: metricsHandler, Logger: log.NewNoopLogger(), + TracerProvider: noop.NewTracerProvider(), }, }) queue := queueFactory.CreateQueue(mockShard, nil) diff --git a/service/history/archival_queue_task_executor_test.go b/service/history/archival_queue_task_executor_test.go index b4104de1209..85ec5530558 100644 --- a/service/history/archival_queue_task_executor_test.go +++ b/service/history/archival_queue_task_executor_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" @@ -535,6 +536,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) { mockMetadata, logger, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) err := executable.Execute() if len(p.ExpectedErrorSubstrings) > 0 { diff --git a/service/history/memory_scheduled_queue_factory.go b/service/history/memory_scheduled_queue_factory.go index 575278a42bd..c8d51adffba 100644 --- a/service/history/memory_scheduled_queue_factory.go +++ b/service/history/memory_scheduled_queue_factory.go @@ -25,6 +25,7 @@ package history import ( + "go.opentelemetry.io/otel/trace" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" @@ -48,6 +49,7 @@ type ( Config *configs.Config TimeSource clock.TimeSource MetricsHandler metrics.Handler + TracerProvider trace.TracerProvider Logger log.SnTaggedLogger ExecutorWrapper queues.ExecutorWrapper `optional:"true"` @@ -61,6 +63,7 @@ type ( clusterMetadata cluster.Metadata timeSource clock.TimeSource metricsHandler metrics.Handler + tracer trace.Tracer logger log.SnTaggedLogger executorWrapper queues.ExecutorWrapper @@ -88,6 +91,7 @@ func NewMemoryScheduledQueueFactory( clusterMetadata: params.ClusterMetadata, timeSource: params.TimeSource, metricsHandler: metricsHandler, + tracer: params.TracerProvider.Tracer("queue.memory"), logger: logger, executorWrapper: params.ExecutorWrapper, } @@ -129,6 +133,7 @@ func (f *memoryScheduledQueueFactory) CreateQueue( f.clusterMetadata, f.timeSource, f.metricsHandler, + f.tracer, f.logger, ) } diff --git a/service/history/outbound_queue_factory.go b/service/history/outbound_queue_factory.go index 78147635be9..22f0abf56c0 100644 --- a/service/history/outbound_queue_factory.go +++ b/service/history/outbound_queue_factory.go @@ -265,6 +265,7 @@ func (f *outboundQueueFactory) CreateQueue( shardContext.GetClusterMetadata(), logger, metricsHandler, + f.TracerProvider.Tracer("queue.outbound"), f.DLQWriter, f.Config.TaskDLQEnabled, f.Config.TaskDLQUnexpectedErrorAttempts, diff --git a/service/history/queue_factory_base.go b/service/history/queue_factory_base.go index 0be5b212ad9..e1fda38c645 100644 --- a/service/history/queue_factory_base.go +++ b/service/history/queue_factory_base.go @@ -27,6 +27,7 @@ package history import ( "context" + "go.opentelemetry.io/otel/trace" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" @@ -74,6 +75,7 @@ type ( Config *configs.Config TimeSource clock.TimeSource MetricsHandler metrics.Handler + TracerProvider trace.TracerProvider Logger log.SnTaggedLogger SchedulerRateLimiter queues.SchedulerRateLimiter DLQWriter *queues.DLQWriter @@ -86,6 +88,7 @@ type ( HostScheduler queues.Scheduler HostPriorityAssigner queues.PriorityAssigner HostReaderRateLimiter quotas.RequestRateLimiter + Tracer trace.Tracer } QueueFactoriesLifetimeHookParams struct { diff --git a/service/history/queues/executable.go b/service/history/queues/executable.go index 34109378b3c..c7b89536cbb 100644 --- a/service/history/queues/executable.go +++ b/service/history/queues/executable.go @@ -36,6 +36,8 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common" @@ -50,6 +52,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" ctasks "go.temporal.io/server/common/tasks" + "go.temporal.io/server/common/telemetry" "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" @@ -162,6 +165,7 @@ type ( clusterMetadata cluster.Metadata logger log.Logger metricsHandler metrics.Handler + tracer trace.Tracer dlqWriter *DLQWriter readerID int64 @@ -201,6 +205,7 @@ func NewExecutable( clusterMetadata cluster.Metadata, logger log.Logger, metricsHandler metrics.Handler, + tracer trace.Tracer, opts ...ExecutableOption, ) Executable { params := ExecutableParams{ @@ -241,6 +246,7 @@ func NewExecutable( }, ), metricsHandler: metricsHandler, + tracer: tracer, dlqWriter: params.DLQWriter, dlqEnabled: params.DLQEnabled, maxUnexpectedErrorAttempts: params.MaxUnexpectedErrorAttempts, @@ -277,6 +283,25 @@ func (e *executableImpl) Execute() (retErr error) { ) e.Unlock() + if telemetry.IsEnabled(e.tracer) { + var span trace.Span + ctx, span = e.tracer.Start( + ctx, + fmt.Sprintf("queue.Execute/%v", e.GetType().String()), + trace.WithSpanKind(trace.SpanKindConsumer), + trace.WithAttributes( + attribute.Key(telemetry.WorkflowIDKey).String(e.GetWorkflowID()), + attribute.Key(telemetry.WorkflowRunIDKey).String(e.GetRunID()), + attribute.Key("task-type").String(e.GetType().String()), + attribute.Key("task-id").Int64(e.GetTaskID()))) + defer func() { + if retErr != nil { + span.RecordError(retErr) + } + span.End() + }() + } + defer func() { if panicObj := recover(); panicObj != nil { err, ok := panicObj.(error) diff --git a/service/history/queues/executable_factory.go b/service/history/queues/executable_factory.go index 86630156cbb..44808cbae85 100644 --- a/service/history/queues/executable_factory.go +++ b/service/history/queues/executable_factory.go @@ -25,6 +25,7 @@ package queues import ( + "go.opentelemetry.io/otel/trace" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" @@ -52,6 +53,7 @@ type ( clusterMetadata cluster.Metadata logger log.Logger metricsHandler metrics.Handler + tracer trace.Tracer dlqWriter *DLQWriter dlqEnabled dynamicconfig.BoolPropertyFn attemptsBeforeSendingToDlq dynamicconfig.IntPropertyFn @@ -74,6 +76,7 @@ func NewExecutableFactory( clusterMetadata cluster.Metadata, logger log.Logger, metricsHandler metrics.Handler, + tracer trace.Tracer, dlqWriter *DLQWriter, dlqEnabled dynamicconfig.BoolPropertyFn, attemptsBeforeSendingToDlq dynamicconfig.IntPropertyFn, @@ -90,6 +93,7 @@ func NewExecutableFactory( clusterMetadata: clusterMetadata, logger: logger, metricsHandler: metricsHandler.WithTags(defaultExecutableMetricsTags...), + tracer: tracer, dlqWriter: dlqWriter, dlqEnabled: dlqEnabled, attemptsBeforeSendingToDlq: attemptsBeforeSendingToDlq, @@ -111,6 +115,7 @@ func (f *executableFactoryImpl) NewExecutable(task tasks.Task, readerID int64) E f.clusterMetadata, f.logger, f.metricsHandler, + f.tracer, func(params *ExecutableParams) { params.DLQEnabled = f.dlqEnabled params.DLQWriter = f.dlqWriter diff --git a/service/history/queues/executable_test.go b/service/history/queues/executable_test.go index e1d362f451e..eb58fbf8a05 100644 --- a/service/history/queues/executable_test.go +++ b/service/history/queues/executable_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/common/clock" @@ -1079,6 +1080,7 @@ func (s *executableSuite) newTestExecutable(opts ...option) queues.Executable { s.mockClusterMetadata, log.NewTestLogger(), s.metricsHandler, + noop.NewTracerProvider().Tracer(""), func(params *queues.ExecutableParams) { params.DLQEnabled = p.dlqEnabled params.DLQWriter = p.dlqWriter diff --git a/service/history/queues/memory_scheduled_queue_test.go b/service/history/queues/memory_scheduled_queue_test.go index 6ac30a2a14f..6f27535d206 100644 --- a/service/history/queues/memory_scheduled_queue_test.go +++ b/service/history/queues/memory_scheduled_queue_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" @@ -183,6 +184,7 @@ func (s *memoryScheduledQueueSuite) newSpeculativeWorkflowTaskTimeoutTestExecuta nil, nil, nil, + noop.NewTracerProvider().Tracer(""), ), wttt, ) diff --git a/service/history/queues/queue_base_test.go b/service/history/queues/queue_base_test.go index f60d6a9490a..2b8eb93c198 100644 --- a/service/history/queues/queue_base_test.go +++ b/service/history/queues/queue_base_test.go @@ -34,6 +34,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/cluster" @@ -567,6 +568,7 @@ func (s *queueBaseSuite) newQueueBase( mockShard.GetClusterMetadata(), s.logger, s.metricsHandler, + noop.NewTracerProvider().Tracer(""), nil, func() bool { return false diff --git a/service/history/queues/queue_scheduled_test.go b/service/history/queues/queue_scheduled_test.go index 67a9d716c05..d1a4172e8d7 100644 --- a/service/history/queues/queue_scheduled_test.go +++ b/service/history/queues/queue_scheduled_test.go @@ -35,6 +35,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" @@ -139,6 +140,7 @@ func (s *scheduledQueueSuite) SetupTest() { s.mockShard.GetClusterMetadata(), logger, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), nil, func() bool { return false diff --git a/service/history/queues/reader_test.go b/service/history/queues/reader_test.go index 3cdde44c8f7..ef438a1fad9 100644 --- a/service/history/queues/reader_test.go +++ b/service/history/queues/reader_test.go @@ -33,6 +33,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/collection" "go.temporal.io/server/common/dynamicconfig" @@ -88,6 +89,7 @@ func (s *readerSuite) SetupTest() { nil, nil, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) }) s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{ diff --git a/service/history/queues/slice_test.go b/service/history/queues/slice_test.go index 6a536f85153..bbccb197fd5 100644 --- a/service/history/queues/slice_test.go +++ b/service/history/queues/slice_test.go @@ -34,6 +34,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/collection" @@ -84,6 +85,7 @@ func (s *sliceSuite) SetupTest() { nil, nil, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) }) s.monitor = newMonitor(tasks.CategoryTypeScheduled, clock.NewRealTimeSource(), &MonitorOptions{ diff --git a/service/history/queues/speculative_workflow_task_timeout_queue.go b/service/history/queues/speculative_workflow_task_timeout_queue.go index e75dd20169f..748cc441a70 100644 --- a/service/history/queues/speculative_workflow_task_timeout_queue.go +++ b/service/history/queues/speculative_workflow_task_timeout_queue.go @@ -25,6 +25,7 @@ package queues import ( + "go.opentelemetry.io/otel/trace" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/log" @@ -44,6 +45,7 @@ type ( clusterMetadata cluster.Metadata timeSource clock.TimeSource metricsHandler metrics.Handler + tracer trace.Tracer logger log.SnTaggedLogger } ) @@ -56,6 +58,7 @@ func NewSpeculativeWorkflowTaskTimeoutQueue( clusterMetadata cluster.Metadata, timeSource clock.TimeSource, metricsHandler metrics.Handler, + tracer trace.Tracer, logger log.SnTaggedLogger, ) *SpeculativeWorkflowTaskTimeoutQueue { @@ -74,6 +77,7 @@ func NewSpeculativeWorkflowTaskTimeoutQueue( clusterMetadata: clusterMetadata, timeSource: timeSource, metricsHandler: metricsHandler, + tracer: tracer, logger: logger, } } @@ -105,6 +109,7 @@ func (q SpeculativeWorkflowTaskTimeoutQueue) NotifyNewTasks(ts []tasks.Task) { q.clusterMetadata, q.logger, q.metricsHandler.WithTags(defaultExecutableMetricsTags...), + q.tracer, ), wttt) q.timeoutQueue.Add(executable) } diff --git a/service/history/timer_queue_active_task_executor_test.go b/service/history/timer_queue_active_task_executor_test.go index 9c5e23109f9..4c7e4eb2f8b 100644 --- a/service/history/timer_queue_active_task_executor_test.go +++ b/service/history/timer_queue_active_task_executor_test.go @@ -32,6 +32,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" @@ -2054,6 +2055,7 @@ func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) } diff --git a/service/history/timer_queue_factory.go b/service/history/timer_queue_factory.go index 98a98c60fc9..86048165368 100644 --- a/service/history/timer_queue_factory.go +++ b/service/history/timer_queue_factory.go @@ -85,6 +85,7 @@ func NewTimerQueueFactory( ), int64(params.Config.TimerQueueMaxReaderCount()), ), + Tracer: params.TracerProvider.Tracer("queue.timer"), }, } } @@ -176,6 +177,7 @@ func (f *timerQueueFactory) CreateQueue( shardContext.GetClusterMetadata(), logger, metricsHandler, + f.Tracer, f.DLQWriter, f.Config.TaskDLQEnabled, f.Config.TaskDLQUnexpectedErrorAttempts, diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index e151e72af4d..7f4ead375d7 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -32,6 +32,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" @@ -1967,6 +1968,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) } diff --git a/service/history/transfer_queue_active_task_executor_test.go b/service/history/transfer_queue_active_task_executor_test.go index 53cc80c6e23..d897b313c14 100644 --- a/service/history/transfer_queue_active_task_executor_test.go +++ b/service/history/transfer_queue_active_task_executor_test.go @@ -33,6 +33,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -2938,6 +2939,7 @@ func (s *transferQueueActiveTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) } diff --git a/service/history/transfer_queue_factory.go b/service/history/transfer_queue_factory.go index bdf054d63e3..e76cf420a17 100644 --- a/service/history/transfer_queue_factory.go +++ b/service/history/transfer_queue_factory.go @@ -88,6 +88,7 @@ func NewTransferQueueFactory( ), int64(params.Config.TransferQueueMaxReaderCount()), ), + Tracer: params.TracerProvider.Tracer("queue.transfer"), }, } } @@ -170,6 +171,7 @@ func (f *transferQueueFactory) CreateQueue( shardContext.GetClusterMetadata(), logger, metricsHandler, + f.Tracer, f.DLQWriter, f.Config.TaskDLQEnabled, f.Config.TaskDLQUnexpectedErrorAttempts, diff --git a/service/history/transfer_queue_standby_task_executor_test.go b/service/history/transfer_queue_standby_task_executor_test.go index 89c8fcdc9da..eef0f0dbdbf 100644 --- a/service/history/transfer_queue_standby_task_executor_test.go +++ b/service/history/transfer_queue_standby_task_executor_test.go @@ -34,6 +34,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -1146,6 +1147,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) newTaskExecutable( s.mockClusterMetadata, nil, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) } diff --git a/service/history/visibility_queue_factory.go b/service/history/visibility_queue_factory.go index 66fd4f308b4..e8262d5f7be 100644 --- a/service/history/visibility_queue_factory.go +++ b/service/history/visibility_queue_factory.go @@ -81,6 +81,7 @@ func NewVisibilityQueueFactory( ), int64(params.Config.VisibilityQueueMaxReaderCount()), ), + Tracer: params.TracerProvider.Tracer("queue.visibility"), }, } } @@ -140,6 +141,7 @@ func (f *visibilityQueueFactory) CreateQueue( shard.GetClusterMetadata(), logger, metricsHandler, + f.Tracer, f.DLQWriter, f.Config.TaskDLQEnabled, f.Config.TaskDLQUnexpectedErrorAttempts, diff --git a/service/history/visibility_queue_task_executor_test.go b/service/history/visibility_queue_task_executor_test.go index c37f30f588a..25c5acbe47b 100644 --- a/service/history/visibility_queue_task_executor_test.go +++ b/service/history/visibility_queue_task_executor_test.go @@ -32,6 +32,7 @@ import ( "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.opentelemetry.io/otel/trace/noop" commonpb "go.temporal.io/api/common/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" @@ -721,6 +722,7 @@ func (s *visibilityQueueTaskExecutorSuite) newTaskExecutable( s.mockShard.GetClusterMetadata(), nil, metrics.NoopMetricsHandler, + noop.NewTracerProvider().Tracer(""), ) }