From 22299b9aafcbec931f8f77e42890c2c321ba6f85 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Mon, 19 Dec 2022 11:03:25 -0800 Subject: [PATCH] Add an archival.Module to be used in both tests and prod --- common/dynamicconfig/constants.go | 2 ++ service/history/archival/archiver_test.go | 35 +++++++++++++++++- service/history/archival/fx.go | 44 +++++++++++++++++++++++ service/history/configs/config.go | 2 ++ service/history/fx.go | 2 ++ service/history/workflow/fx.go | 1 + 6 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 service/history/archival/fx.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index a3ad2eebda48..dc01d21063a0 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -522,6 +522,8 @@ const ( ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay" // ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning" + // ArchivalBackendMaxRPS is the maximum rate of requests per second to the archival backend + ArchivalBackendMaxRPS = "history.archivalBackendMaxRPS" // ReplicatorTaskBatchSize is batch size for ReplicatorProcessor ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize" diff --git a/service/history/archival/archiver_test.go b/service/history/archival/archiver_test.go index 8063b5a0eaef..296b250a276f 100644 --- a/service/history/archival/archiver_test.go +++ b/service/history/archival/archiver_test.go @@ -32,6 +32,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/fx" "go.uber.org/multierr" carchiver "go.temporal.io/server/common/archiver" @@ -42,6 +43,7 @@ import ( "go.temporal.io/server/common/quotas" "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/testing/mocksdk" + "go.temporal.io/server/service/history/configs" ) func TestArchiver(t *testing.T) { @@ -280,7 +282,38 @@ func TestArchiver(t *testing.T) { rateLimiter := quotas.NewMockRateLimiter(controller) rateLimiter.EXPECT().WaitN(gomock.Any(), 2).Return(c.RateLimiterWaitErr) - archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter) + // we need this channel to get the Archiver which is created asynchronously + archivers := make(chan Archiver, 1) + // we make an app here so that we can test that the Module is working as intended + app := fx.New( + fx.Supply(fx.Annotate(archiverProvider, fx.As(new(provider.ArchiverProvider)))), + fx.Supply(fx.Annotate(logRecorder, fx.As(new(log.Logger)))), + fx.Supply(fx.Annotate(metricsHandler, fx.As(new(metrics.Handler)))), + fx.Supply(&configs.Config{ + ArchivalBackendMaxRPS: func() float64 { + return 42.0 + }, + }), + Module, + fx.Decorate(func(rl quotas.RateLimiter) quotas.RateLimiter { + // we need to decorate the rate limiter so that we can use the mock + // we also verify that the rate being used is equal to the one in the config + assert.Equal(t, 42.0, rl.Rate()) + return rateLimiter + }), + fx.Invoke(func(a Archiver) { + // after all parameters are provided, we get the Archiver and put it in the channel + // so that we can use it in the test + archivers <- a + }), + ) + require.NoError(t, app.Err()) + // we need to start the app for fx.Invoke to be called, so that we can get the Archiver + require.NoError(t, app.Start(ctx)) + defer func() { + require.NoError(t, app.Stop(ctx)) + }() + archiver := <-archivers _, err = archiver.Archive(ctx, &Request{ HistoryURI: historyURI, VisibilityURI: visibilityURI, diff --git a/service/history/archival/fx.go b/service/history/archival/fx.go new file mode 100644 index 000000000000..a9821f93e6ff --- /dev/null +++ b/service/history/archival/fx.go @@ -0,0 +1,44 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package archival + +import ( + "math" + + "go.uber.org/fx" + + "go.temporal.io/server/common/quotas" + "go.temporal.io/server/service/history/configs" +) + +var Module = fx.Options( + fx.Provide(NewArchiver), + fx.Provide(func(config *configs.Config) quotas.RateLimiter { + return quotas.NewRateLimiter( + config.ArchivalBackendMaxRPS(), + int(math.Ceil(config.ArchivalBackendMaxRPS())), + ) + }), +) diff --git a/service/history/configs/config.go b/service/history/configs/config.go index f6cdcd78a1e6..fe61ba1ab962 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -295,6 +295,7 @@ type Config struct { ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn + ArchivalBackendMaxRPS dynamicconfig.FloatPropertyFn } const ( @@ -525,6 +526,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second), ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute), ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100), + ArchivalBackendMaxRPS: dc.GetFloat64Property(dynamicconfig.ArchivalBackendMaxRPS, 10000.0), } return cfg diff --git a/service/history/fx.go b/service/history/fx.go index e377c074fb1f..5718d1c3beba 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -54,6 +54,7 @@ import ( "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/history/api" + "go.temporal.io/server/service/history/archival" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" @@ -68,6 +69,7 @@ var Module = fx.Options( workflow.Module, shard.Module, cache.Module, + archival.Module, fx.Provide(dynamicconfig.NewCollection), fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly fx.Provide(RetryableInterceptorProvider), diff --git a/service/history/workflow/fx.go b/service/history/workflow/fx.go index 3af6f0ce37ab..02352bf4853b 100644 --- a/service/history/workflow/fx.go +++ b/service/history/workflow/fx.go @@ -30,4 +30,5 @@ import ( var Module = fx.Options( fx.Populate(&taskGeneratorProvider), + fx.Provide(NewRelocatableAttributesFetcher), )