From 71a13c67cfef1d2524a59f36404c56f3ba508556 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Wed, 18 Jan 2023 15:47:31 -0800 Subject: [PATCH] Fix two bugs in the archival queue task executor --- common/metrics/metric_defs.go | 3 + .../history/archival_queue_task_executor.go | 75 +++++++++---------- .../archival_queue_task_executor_test.go | 16 +++- 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index e4cac2ce829..9cc12988144 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -67,6 +67,9 @@ const ( MutableStateCacheTypeTagValue = "mutablestate" EventsCacheTypeTagValue = "events" + + InvalidHistoryURITagValue = "invalid_history_uri" + InvalidVisibilityURITagValue = "invalid_visibility_uri" ) // Common service base metrics diff --git a/service/history/archival_queue_task_executor.go b/service/history/archival_queue_task_executor.go index cbf27bf00c8..a4d7ba93bc6 100644 --- a/service/history/archival_queue_task_executor.go +++ b/service/history/archival_queue_task_executor.go @@ -112,9 +112,11 @@ func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Cont if err != nil { return err } - _, err = e.archiver.Archive(ctx, request) - if err != nil { - return err + if len(request.Targets) > 0 { + _, err = e.archiver.Archive(ctx, request) + if err != nil { + return err + } } return e.addDeletionTask(ctx, logger, task, request.CloseTime) } @@ -154,52 +156,47 @@ func (e *archivalQueueTaskExecutor) getArchiveTaskRequest( return nil, err } + var historyURI, visibilityURI carchiver.URI var targets []archival.Target if e.shardContext.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() && namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED { targets = append(targets, archival.TargetVisibility) + visibilityURIString := namespaceEntry.VisibilityArchivalState().URI + visibilityURI, err = carchiver.NewURI(visibilityURIString) + if err != nil { + e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record( + 1, + metrics.NamespaceTag(namespaceName.String()), + metrics.FailureTag(metrics.InvalidVisibilityURITagValue), + ) + logger.Error( + "Failed to parse visibility URI.", + tag.ArchivalURI(visibilityURIString), + tag.Error(err), + ) + return nil, fmt.Errorf("failed to parse visibility URI for archival task: %w", err) + } } if e.shardContext.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() && namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED { + historyURIString := namespaceEntry.HistoryArchivalState().URI + historyURI, err = carchiver.NewURI(historyURIString) + if err != nil { + e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record( + 1, + metrics.NamespaceTag(namespaceName.String()), + metrics.FailureTag(metrics.InvalidHistoryURITagValue), + ) + logger.Error( + "Failed to parse history URI.", + tag.ArchivalURI(historyURIString), + tag.Error(err), + ) + return nil, fmt.Errorf("failed to parse history URI for archival task: %w", err) + } targets = append(targets, archival.TargetHistory) } - if len(targets) == 0 { - return nil, fmt.Errorf( - "no archival targets configured for archive execution task: %+v", - task.WorkflowKey, - ) - } - historyURIString := namespaceEntry.HistoryArchivalState().URI - historyURI, err := carchiver.NewURI(historyURIString) - if err != nil { - e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record( - 1, - metrics.NamespaceTag(namespaceName.String()), - metrics.FailureTag("invalid_history_uri"), - ) - logger.Error( - "Failed to parse history URI.", - tag.ArchivalURI(historyURIString), - tag.Error(err), - ) - return nil, fmt.Errorf("failed to parse history URI for archival task: %w", err) - } - visibilityURIString := namespaceEntry.VisibilityArchivalState().URI - visibilityURI, err := carchiver.NewURI(visibilityURIString) - if err != nil { - e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record( - 1, - metrics.NamespaceTag(namespaceName.String()), - metrics.FailureTag("invalid_visibility_uri"), - ) - logger.Error( - "Failed to parse visibility URI.", - tag.ArchivalURI(visibilityURIString), - tag.Error(err), - ) - return nil, fmt.Errorf("failed to parse visibility URI for archival task: %w", err) - } workflowAttributes, err := e.relocatableAttributesFetcher.Fetch(ctx, mutableState) if err != nil { return nil, err diff --git a/service/history/archival_queue_task_executor_test.go b/service/history/archival_queue_task_executor_test.go index aee4d611d78..9d3ab1561c2 100644 --- a/service/history/archival_queue_task_executor_test.go +++ b/service/history/archival_queue_task_executor_test.go @@ -73,6 +73,18 @@ func TestArchivalQueueTaskExecutor(t *testing.T) { } }, }, + { + Name: "URIs are not read for empty targets", + Configure: func(p *params) { + p.HistoryConfig.ClusterEnabled = false + p.VisibilityConfig.ClusterEnabled = false + // we set the URIs to invalid values which would produce errors if they were read + // we should not read these URIs because history and visibility archival are disabled + p.HistoryURI = "invalid_uri" + p.VisibilityURI = "invalid_uri" + p.ExpectArchive = false + }, + }, { Name: "history archival disabled for namespace", Configure: func(p *params) { @@ -105,11 +117,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) { Configure: func(p *params) { p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalDisabled p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalDisabled - p.ExpectedErrorSubstrings = []string{ - "no archival targets", - } p.ExpectArchive = false - p.ExpectAddTask = false }, }, {