diff --git a/service/history/archival/archiver.go b/service/history/archival/archiver.go index 07a079c537d..06ae7129fd9 100644 --- a/service/history/archival/archiver.go +++ b/service/history/archival/archiver.go @@ -36,6 +36,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/persistence/visibility/manager" "go.uber.org/multierr" archiverspb "go.temporal.io/server/api/archiver/v1" @@ -92,10 +93,12 @@ type ( } archiver struct { - archiverProvider provider.ArchiverProvider - metricsHandler metrics.Handler - logger log.Logger - rateLimiter quotas.RateLimiter + archiverProvider provider.ArchiverProvider + metricsHandler metrics.Handler + logger log.Logger + rateLimiter quotas.RateLimiter + searchAttributeProvider searchattribute.Provider + visibilityManager manager.VisibilityManager } ) @@ -110,12 +113,16 @@ func NewArchiver( logger log.Logger, metricsHandler metrics.Handler, rateLimiter quotas.RateLimiter, + searchAttributeProvider searchattribute.Provider, + visibilityManger manager.VisibilityManager, ) Archiver { return &archiver{ - archiverProvider: archiverProvider, - metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ArchiverClientScope)), - logger: logger, - rateLimiter: rateLimiter, + archiverProvider: archiverProvider, + metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ArchiverClientScope)), + logger: logger, + rateLimiter: rateLimiter, + searchAttributeProvider: searchAttributeProvider, + visibilityManager: visibilityManger, } } @@ -129,20 +136,27 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response tag.ArchivalRequestWorkflowID(request.WorkflowID), tag.ArchivalRequestRunID(request.RunID), ) + defer func(start time.Time) { metricsScope := a.metricsHandler + status := "ok" if err != nil { status = "err" + var rateLimitExceededErr *serviceerror.ResourceExhausted + if errors.As(err, &rateLimitExceededErr) { status = "rate_limit_exceeded" } + logger.Warn("failed to archive workflow", tag.Error(err)) } + metricsScope.Timer(metrics.ArchiverArchiveLatency.GetMetricName()). Record(time.Since(start), metrics.StringTag("status", status)) }(time.Now()) + numTargets := len(request.Targets) if err := a.rateLimiter.WaitN(ctx, numTargets); err != nil { return nil, &serviceerror.ResourceExhausted{ @@ -150,27 +164,36 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response Message: fmt.Sprintf("archival rate limited: %s", err.Error()), } } + var wg sync.WaitGroup + errs := make([]error, numTargets) + for i, target := range request.Targets { wg.Add(1) + i := i + switch target { case TargetHistory: go func() { defer wg.Done() + errs[i] = a.archiveHistory(ctx, request, logger) }() case TargetVisibility: go func() { defer wg.Done() + errs[i] = a.archiveVisibility(ctx, request, logger) }() default: return nil, fmt.Errorf("unknown archival target: %s", target) } } + wg.Wait() + return &Response{}, multierr.Combine(errs...) } @@ -212,11 +235,18 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg return err } - // It is safe to pass nil to typeMap here because search attributes type must be embedded by caller. - searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, nil) + // The types of the search attributes may not be embedded in the request, + // so we fetch them from the search attributes provider here. + saTypeMap, err := a.searchAttributeProvider.GetSearchAttributes(a.visibilityManager.GetIndexName(), false) + if err != nil { + return err + } + + searchAttributes, err := searchattribute.Stringify(request.SearchAttributes, &saTypeMap) if err != nil { return err } + return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{ NamespaceId: request.NamespaceID, Namespace: request.Namespace, @@ -238,11 +268,14 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg // statement (this would make the err always nil). func (a *archiver) recordArchiveTargetResult(logger log.Logger, startTime time.Time, target Target, err *error) { duration := time.Since(startTime) + status := "ok" if *err != nil { status = "err" + logger.Error("failed to archive target", tag.NewStringTag("target", string(target)), tag.Error(*err)) } + tags := []metrics.Tag{ metrics.StringTag("target", string(target)), metrics.StringTag("status", status), diff --git a/service/history/archival/archiver_test.go b/service/history/archival/archiver_test.go index 797480c5f23..ecee80abd81 100644 --- a/service/history/archival/archiver_test.go +++ b/service/history/archival/archiver_test.go @@ -35,13 +35,16 @@ import ( "go.uber.org/fx" "go.uber.org/multierr" + "go.temporal.io/api/common/v1" carchiver "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/archiver/provider" "go.temporal.io/server/common/log" - "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/payload" + "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/sdk" + "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/testing/mocksdk" "go.temporal.io/server/service/history/configs" ) @@ -53,33 +56,23 @@ func TestArchiver(t *testing.T) { ArchiveVisibilityErr error RateLimiterWaitErr error Targets []Target + SearchAttributes *common.SearchAttributes + SearchAttributesErr error + NameTypeMap searchattribute.NameTypeMap + NameTypeMapErr error ExpectArchiveHistory bool ExpectArchiveVisibility bool - ExpectedMetrics []expectedMetric ExpectedReturnErrors []string }{ { Name: "No targets", Targets: []Target{}, - - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "ok")}, - }, - }, }, { Name: "Invalid target", Targets: []Target{Target("oops")}, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "err")}, - }, - }, ExpectedReturnErrors: []string{ "unknown archival target: oops", }, @@ -89,19 +82,6 @@ func TestArchiver(t *testing.T) { Targets: []Target{TargetHistory}, ArchiveHistoryErr: nil, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "ok")}, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "history"), - metrics.StringTag("status", "ok"), - }, - }, - }, ExpectArchiveHistory: true, }, { @@ -110,38 +90,12 @@ func TestArchiver(t *testing.T) { ArchiveHistoryErr: errors.New("example archive history error"), ExpectArchiveHistory: true, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "err")}, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "history"), - metrics.StringTag("status", "err"), - }, - }, - }, ExpectedReturnErrors: []string{"example archive history err"}, }, { Name: "Visibility archival succeeds", Targets: []Target{TargetVisibility}, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "ok")}, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "visibility"), - metrics.StringTag("status", "ok"), - }, - }, - }, ExpectArchiveVisibility: true, }, { @@ -152,26 +106,6 @@ func TestArchiver(t *testing.T) { ExpectArchiveHistory: true, ExpectArchiveVisibility: true, ExpectedReturnErrors: []string{"example archive visibility error"}, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "err")}, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "history"), - metrics.StringTag("status", "ok"), - }, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "visibility"), - metrics.StringTag("status", "err"), - }, - }, - }, }, { Name: "Visibility succeeds but history fails", @@ -181,26 +115,6 @@ func TestArchiver(t *testing.T) { ExpectArchiveHistory: true, ExpectArchiveVisibility: true, ExpectedReturnErrors: []string{"example archive history error"}, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "err")}, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "history"), - metrics.StringTag("status", "err"), - }, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "visibility"), - metrics.StringTag("status", "ok"), - }, - }, - }, }, { Name: "Both targets succeed", @@ -208,26 +122,6 @@ func TestArchiver(t *testing.T) { ExpectArchiveHistory: true, ExpectArchiveVisibility: true, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "ok")}, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "history"), - metrics.StringTag("status", "ok"), - }, - }, - { - Metric: metrics.ArchiverArchiveTargetLatency.GetMetricName(), - Tags: []metrics.Tag{ - metrics.StringTag("target", "visibility"), - metrics.StringTag("status", "ok"), - }, - }, - }, }, { Name: "Rate limit hit", @@ -237,58 +131,94 @@ func TestArchiver(t *testing.T) { ExpectedReturnErrors: []string{ "archival rate limited: didn't acquire rate limiter tokens in time", }, - ExpectedMetrics: []expectedMetric{ - { - Metric: metrics.ArchiverArchiveLatency.GetMetricName(), - Tags: []metrics.Tag{metrics.StringTag("status", "rate_limit_exceeded")}, - }, + }, + { + Name: "Search attribute with no embedded type information", + Targets: []Target{TargetVisibility}, + SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{ + "Text01": payload.EncodeString("value"), + }}, + NameTypeMap: searchattribute.TestNameTypeMap, + + ExpectArchiveVisibility: true, + }, + { + Name: "Search attribute missing in type map", + Targets: []Target{TargetVisibility}, + SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{ + "Text01": payload.EncodeString("value"), + }}, + NameTypeMap: searchattribute.NameTypeMap{}, + + ExpectedReturnErrors: []string{ + "invalid search attribute type: Unspecified", + }, + }, + { + Name: "Error getting name type map from search attribute provider", + Targets: []Target{TargetVisibility}, + SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{ + "Text01": payload.EncodeString("value"), + }}, + NameTypeMapErr: errors.New("name-type-map-err"), + + ExpectedReturnErrors: []string{ + "name-type-map-err", }, }, } { c := c // capture range variable t.Run(c.Name, func(t *testing.T) { t.Parallel() + ctx := context.Background() controller := gomock.NewController(t) archiverProvider := provider.NewMockArchiverProvider(controller) historyArchiver := carchiver.NewMockHistoryArchiver(controller) visibilityArchiver := carchiver.NewMockVisibilityArchiver(controller) - metricsHandler := metrics.NewMockHandler(controller) - metricsHandler.EXPECT().WithTags(metrics.OperationTag(metrics.ArchiverClientScope)).Return(metricsHandler) + metricsHandler := metrics.NoopMetricsHandler sdkClient := mocksdk.NewMockClient(controller) sdkClientFactory := sdk.NewMockClientFactory(controller) sdkClientFactory.EXPECT().GetSystemClient().Return(sdkClient).AnyTimes() - logRecorder := &errorLogRecorder{ - T: t, - Logger: log.NewNoopLogger(), - } - for _, m := range c.ExpectedMetrics { - metricsHandler.EXPECT().Timer(m.Metric).Return(metrics.NoopTimerMetricFunc) - } + historyURI, err := carchiver.NewURI("test:///history/archival") require.NoError(t, err) + if c.ExpectArchiveHistory { archiverProvider.EXPECT().GetHistoryArchiver(gomock.Any(), gomock.Any()).Return(historyArchiver, nil) historyArchiver.EXPECT().Archive(gomock.Any(), historyURI, gomock.Any()).Return(c.ArchiveHistoryErr) } + visibilityURI, err := carchiver.NewURI("test:///visibility/archival") require.NoError(t, err) + archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()). + Return(visibilityArchiver, nil).AnyTimes() + if c.ExpectArchiveVisibility { - archiverProvider.EXPECT().GetVisibilityArchiver(gomock.Any(), gomock.Any()). - Return(visibilityArchiver, nil) visibilityArchiver.EXPECT().Archive(gomock.Any(), visibilityURI, gomock.Any()). Return(c.ArchiveVisibilityErr) } + rateLimiter := quotas.NewMockRateLimiter(controller) rateLimiter.EXPECT().WaitN(gomock.Any(), len(c.Targets)).Return(c.RateLimiterWaitErr) + searchAttributeProvider := searchattribute.NewMockProvider(controller) + searchAttributeProvider.EXPECT().GetSearchAttributes(gomock.Any(), gomock.Any()).Return( + c.NameTypeMap, c.NameTypeMapErr, + ).AnyTimes() + + visibilityManager := manager.NewMockVisibilityManager(controller) + visibilityManager.EXPECT().GetIndexName().Return("index-name").AnyTimes() + // we need this channel to get the Archiver which is created asynchronously archivers := make(chan Archiver, 1) // we make an app here so that we can test that the Module is working as intended app := fx.New( fx.Supply(fx.Annotate(archiverProvider, fx.As(new(provider.ArchiverProvider)))), - fx.Supply(fx.Annotate(logRecorder, fx.As(new(log.Logger)))), + fx.Supply(fx.Annotate(log.NewNoopLogger(), fx.As(new(log.Logger)))), fx.Supply(fx.Annotate(metricsHandler, fx.As(new(metrics.Handler)))), + fx.Supply(fx.Annotate(searchAttributeProvider, fx.As(new(searchattribute.Provider)))), + fx.Supply(fx.Annotate(visibilityManager, fx.As(new(manager.VisibilityManager)))), fx.Supply(&configs.Config{ ArchivalBackendMaxRPS: func() float64 { return 42.0 @@ -310,56 +240,30 @@ func TestArchiver(t *testing.T) { require.NoError(t, app.Err()) // we need to start the app for fx.Invoke to be called, so that we can get the Archiver require.NoError(t, app.Start(ctx)) + defer func() { require.NoError(t, app.Stop(ctx)) }() + archiver := <-archivers + searchAttributes := c.SearchAttributes _, err = archiver.Archive(ctx, &Request{ - HistoryURI: historyURI, - VisibilityURI: visibilityURI, - Targets: c.Targets, + HistoryURI: historyURI, + VisibilityURI: visibilityURI, + Targets: c.Targets, + SearchAttributes: searchAttributes, }) if len(c.ExpectedReturnErrors) > 0 { require.Error(t, err) assert.Len(t, multierr.Errors(err), len(c.ExpectedReturnErrors)) + for _, e := range c.ExpectedReturnErrors { assert.Contains(t, err.Error(), e) } } else { assert.NoError(t, err) } - - var expectedLogErrs []string - if c.ArchiveHistoryErr != nil { - expectedLogErrs = append(expectedLogErrs, c.ArchiveHistoryErr.Error()) - } - if c.ArchiveVisibilityErr != nil { - expectedLogErrs = append(expectedLogErrs, c.ArchiveVisibilityErr.Error()) - } - assert.ElementsMatch(t, expectedLogErrs, logRecorder.ErrorMessages) }) } } - -type expectedMetric struct { - Metric string - Tags []metrics.Tag -} - -type errorLogRecorder struct { - log.Logger - T testing.TB - ErrorMessages []string -} - -func (r *errorLogRecorder) Error(msg string, tags ...tag.Tag) { - for _, t := range tags { - if t.Key() == "error" { - value := t.Value() - message, ok := value.(string) - require.True(r.T, ok) - r.ErrorMessages = append(r.ErrorMessages, message) - } - } -}