Skip to content

Commit

Permalink
Fix two bugs in the archival queue task executor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jan 18, 2023
1 parent c24c83f commit 07c3ad5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 43 deletions.
75 changes: 36 additions & 39 deletions service/history/archival_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Cont
if err != nil {
return err
}
_, err = e.archiver.Archive(ctx, request)
if err != nil {
return err
if len(request.Targets) > 0 {
_, err = e.archiver.Archive(ctx, request)
if err != nil {
return err
}
}
return e.addDeletionTask(ctx, logger, task, request.CloseTime)
}
Expand Down Expand Up @@ -154,52 +156,47 @@ func (e *archivalQueueTaskExecutor) getArchiveTaskRequest(
return nil, err
}

var historyURI, visibilityURI carchiver.URI
var targets []archival.Target
if e.shardContext.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() &&
namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
targets = append(targets, archival.TargetVisibility)
visibilityURIString := namespaceEntry.VisibilityArchivalState().URI
visibilityURI, err = carchiver.NewURI(visibilityURIString)
if err != nil {
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
1,
metrics.NamespaceTag(namespaceName.String()),
metrics.FailureTag("invalid_visibility_uri"),
)
logger.Error(
"Failed to parse visibility URI.",
tag.ArchivalURI(visibilityURIString),
tag.Error(err),
)
return nil, fmt.Errorf("failed to parse visibility URI for archival task: %w", err)
}
}
if e.shardContext.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() &&
namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED {
historyURIString := namespaceEntry.HistoryArchivalState().URI
historyURI, err = carchiver.NewURI(historyURIString)
if err != nil {
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
1,
metrics.NamespaceTag(namespaceName.String()),
metrics.FailureTag("invalid_history_uri"),
)
logger.Error(
"Failed to parse history URI.",
tag.ArchivalURI(historyURIString),
tag.Error(err),
)
return nil, fmt.Errorf("failed to parse history URI for archival task: %w", err)
}
targets = append(targets, archival.TargetHistory)
}
if len(targets) == 0 {
return nil, fmt.Errorf(
"no archival targets configured for archive execution task: %+v",
task.WorkflowKey,
)
}

historyURIString := namespaceEntry.HistoryArchivalState().URI
historyURI, err := carchiver.NewURI(historyURIString)
if err != nil {
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
1,
metrics.NamespaceTag(namespaceName.String()),
metrics.FailureTag("invalid_history_uri"),
)
logger.Error(
"Failed to parse history URI.",
tag.ArchivalURI(historyURIString),
tag.Error(err),
)
return nil, fmt.Errorf("failed to parse history URI for archival task: %w", err)
}
visibilityURIString := namespaceEntry.VisibilityArchivalState().URI
visibilityURI, err := carchiver.NewURI(visibilityURIString)
if err != nil {
e.metricsHandler.Counter(metrics.ArchivalTaskInvalidURI.GetMetricName()).Record(
1,
metrics.NamespaceTag(namespaceName.String()),
metrics.FailureTag("invalid_visibility_uri"),
)
logger.Error(
"Failed to parse visibility URI.",
tag.ArchivalURI(visibilityURIString),
tag.Error(err),
)
return nil, fmt.Errorf("failed to parse visibility URI for archival task: %w", err)
}
workflowAttributes, err := e.relocatableAttributesFetcher.Fetch(ctx, mutableState)
if err != nil {
return nil, err
Expand Down
16 changes: 12 additions & 4 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
}
},
},
{
Name: "URIs are not read for empty targets",
Configure: func(p *params) {
p.HistoryConfig.ClusterEnabled = false
p.VisibilityConfig.ClusterEnabled = false
// we set the URIs to invalid values which would produce errors if they were read
// we should not read these URIs because history and visibility archival are disabled
p.HistoryURI = "invalid_uri"
p.VisibilityURI = "invalid_uri"
p.ExpectArchive = false
},
},
{
Name: "history archival disabled for namespace",
Configure: func(p *params) {
Expand Down Expand Up @@ -105,11 +117,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
Configure: func(p *params) {
p.VisibilityConfig.NamespaceArchivalState = carchiver.ArchivalDisabled
p.HistoryConfig.NamespaceArchivalState = carchiver.ArchivalDisabled
p.ExpectedErrorSubstrings = []string{
"no archival targets",
}
p.ExpectArchive = false
p.ExpectAddTask = false
},
},
{
Expand Down

0 comments on commit 07c3ad5

Please sign in to comment.