diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a3ad2eebda4..dc01d21063a 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -522,6 +522,8 @@ const ( ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay" // ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning" + // ArchivalBackendMaxRPS is the maximum rate of requests per second to the archival backend + ArchivalBackendMaxRPS = "history.archivalBackendMaxRPS" // ReplicatorTaskBatchSize is batch size for ReplicatorProcessor ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize" diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index ef7a87f9c22..fa1f98fd1a8 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -195,7 +195,7 @@ func Invoke( } } - relocatableAttributes, err := workflow.NewRelocatableAttributesFetcher(persistenceVisibilityMgr).Fetch(ctx, mutableState) + relocatableAttributes, err := workflow.RelocatableAttributesFetcherProvider(persistenceVisibilityMgr).Fetch(ctx, mutableState) if err != nil { shard.GetLogger().Error( "Failed to fetch relocatable attributes", diff --git a/service/history/archival/archiver_test.go b/service/history/archival/archiver_test.go index 8063b5a0eae..296b250a276 100644 --- a/service/history/archival/archiver_test.go +++ b/service/history/archival/archiver_test.go @@ -32,6 +32,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/fx" "go.uber.org/multierr" carchiver "go.temporal.io/server/common/archiver" @@ -42,6 +43,7 @@ import ( "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/testing/mocksdk" + "go.temporal.io/server/service/history/configs" ) func TestArchiver(t *testing.T) { @@ -280,7 +282,38 @@ func TestArchiver(t *testing.T) { rateLimiter := quotas.NewMockRateLimiter(controller) rateLimiter.EXPECT().WaitN(gomock.Any(), 2).Return(c.RateLimiterWaitErr) - archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter) + // we need this channel to get the Archiver which is created asynchronously + archivers := make(chan Archiver, 1) + // we make an app here so that we can test that the Module is working as intended + app := fx.New( + fx.Supply(fx.Annotate(archiverProvider, fx.As(new(provider.ArchiverProvider)))), + fx.Supply(fx.Annotate(logRecorder, fx.As(new(log.Logger)))), + fx.Supply(fx.Annotate(metricsHandler, fx.As(new(metrics.Handler)))), + fx.Supply(&configs.Config{ + ArchivalBackendMaxRPS: func() float64 { + return 42.0 + }, + }), + Module, + fx.Decorate(func(rl quotas.RateLimiter) quotas.RateLimiter { + // we need to decorate the rate limiter so that we can use the mock + // we also verify that the rate being used is equal to the one in the config + assert.Equal(t, 42.0, rl.Rate()) + return rateLimiter + }), + fx.Invoke(func(a Archiver) { + // after all parameters are provided, we get the Archiver and put it in the channel + // so that we can use it in the test + archivers <- a + }), + ) + require.NoError(t, app.Err()) + // we need to start the app for fx.Invoke to be called, so that we can get the Archiver + require.NoError(t, app.Start(ctx)) + defer func() { + require.NoError(t, app.Stop(ctx)) + }() + archiver := <-archivers _, err = archiver.Archive(ctx, &Request{ HistoryURI: historyURI, VisibilityURI: visibilityURI, diff --git a/service/history/archival/fx.go b/service/history/archival/fx.go new file mode 100644 index 00000000000..f21e209c463 --- /dev/null +++ b/service/history/archival/fx.go @@ -0,0 +1,39 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package archival + +import ( + "go.uber.org/fx" + + "go.temporal.io/server/common/quotas" + "go.temporal.io/server/service/history/configs" +) + +var Module = fx.Options( + fx.Provide(NewArchiver), + fx.Provide(func(config *configs.Config) quotas.RateLimiter { + return quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(config.ArchivalBackendMaxRPS)) + }), +) diff --git a/service/history/archival_queue_task_executor_test.go b/service/history/archival_queue_task_executor_test.go index 86832196b42..9fb65d402cb 100644 --- a/service/history/archival_queue_task_executor_test.go +++ b/service/history/archival_queue_task_executor_test.go @@ -496,7 +496,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) { a, shardContext, workflowCache, - workflow.NewRelocatableAttributesFetcher(visibilityManager), + workflow.RelocatableAttributesFetcherProvider(visibilityManager), p.MetricsHandler, logger, ) diff --git a/service/history/configs/config.go b/service/history/configs/config.go index cc52e5cf7e9..9e9dd91c736 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -295,6 +295,7 @@ type Config struct { ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn + ArchivalBackendMaxRPS dynamicconfig.FloatPropertyFn } const ( @@ -525,6 +526,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second), ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute), ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100), + ArchivalBackendMaxRPS: dc.GetFloat64Property(dynamicconfig.ArchivalBackendMaxRPS, 10000.0), } return cfg diff --git a/service/history/fx.go b/service/history/fx.go index e377c074fb1..5718d1c3beb 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -54,6 +54,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/archival" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" @@ -68,6 +69,7 @@ var Module = fx.Options( workflow.Module, shard.Module, cache.Module, + archival.Module, fx.Provide(dynamicconfig.NewCollection), fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly fx.Provide(RetryableInterceptorProvider), diff --git a/service/history/workflow/fx.go b/service/history/workflow/fx.go index 3af6f0ce37a..a018b880fee 100644 --- a/service/history/workflow/fx.go +++ b/service/history/workflow/fx.go @@ -30,4 +30,5 @@ import ( var Module = fx.Options( fx.Populate(&taskGeneratorProvider), + fx.Provide(RelocatableAttributesFetcherProvider), ) diff --git a/service/history/workflow/relocatable_attributes_fetcher.go b/service/history/workflow/relocatable_attributes_fetcher.go index 2ff0c1bb51f..baf3877add8 100644 --- a/service/history/workflow/relocatable_attributes_fetcher.go +++ b/service/history/workflow/relocatable_attributes_fetcher.go @@ -41,7 +41,7 @@ type RelocatableAttributesFetcher interface { ) (*RelocatableAttributes, error) } -// NewRelocatableAttributesFetcher creates a new instance of a RelocatableAttributesFetcher. +// RelocatableAttributesFetcherProvider provides a new instance of a RelocatableAttributesFetcher. // The manager.VisibilityManager parameter is used to fetch the relocatable attributes from the persistence backend iff // we already moved them there out from the mutable state. // The visibility manager is not used if the relocatable attributes are still in the mutable state. @@ -52,7 +52,7 @@ type RelocatableAttributesFetcher interface { // Currently, there is no cache, but you may provide a manager.VisibilityManager that supports caching to this function // safely. // TODO: Add a cache around the visibility manager for the relocatable attributes. -func NewRelocatableAttributesFetcher( +func RelocatableAttributesFetcherProvider( visibilityManager manager.VisibilityManager, ) RelocatableAttributesFetcher { return &relocatableAttributesFetcher{ diff --git a/service/history/workflow/relocatable_attributes_fetcher_test.go b/service/history/workflow/relocatable_attributes_fetcher_test.go index 9ad390b4114..d3799f2d63d 100644 --- a/service/history/workflow/relocatable_attributes_fetcher_test.go +++ b/service/history/workflow/relocatable_attributes_fetcher_test.go @@ -126,7 +126,7 @@ func TestRelocatableAttributesFetcher_Fetch(t *testing.T) { } ctx := context.Background() - fetcher := NewRelocatableAttributesFetcher(visibilityManager) + fetcher := RelocatableAttributesFetcherProvider(visibilityManager) info, err := fetcher.Fetch(ctx, mutableState) if c.ExpectedErr != nil {