Skip to content

Commit

Permalink
Add some config values for durable archival
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 3, 2022
1 parent 176cd42 commit d4444cb
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 0 deletions.
26 changes: 26 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,32 @@ const (
// VisibilityProcessorEnsureCloseBeforeDelete means we ensure the visibility of an execution is closed before we delete its visibility records
VisibilityProcessorEnsureCloseBeforeDelete = "history.transferProcessorEnsureCloseBeforeDelete"

// ArchivalTaskBatchSize is batch size for archivalQueueProcessor
ArchivalTaskBatchSize = "history.archivalTaskBatchSize"
// ArchivalProcessorMaxPollRPS is max poll rate per second for archivalQueueProcessor
ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS"
// ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host
ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS"
// ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor
ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount"
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
// archivalQueueProcessor
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
// ArchivalProcessorSchedulerRoundRobinWeights is the priority round robin weights by archival task scheduler for
// all namespaces
ArchivalProcessorSchedulerRoundRobinWeights = "history.archivalProcessorSchedulerRoundRobinWeights"
// ArchivalProcessorMaxPollInterval max poll interval for archivalQueueProcessor
ArchivalProcessorMaxPollInterval = "history.archivalProcessorMaxPollInterval"
// ArchivalProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
ArchivalProcessorMaxPollIntervalJitterCoefficient = "history.archivalProcessorMaxPollIntervalJitterCoefficient"
// ArchivalProcessorUpdateAckInterval is update interval for archivalQueueProcessor
ArchivalProcessorUpdateAckInterval = "history.archivalProcessorUpdateAckInterval"
// ArchivalProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
ArchivalProcessorUpdateAckIntervalJitterCoefficient = "history.archivalProcessorUpdateAckIntervalJitterCoefficient"
// ArchivalProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for
// archivalQueueProcessor
ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval"

// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
// ReplicatorTaskWorkerCount is number of worker for ReplicatorProcessor
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ var (
ComponentEventsCache = component("events-cache")
ComponentTransferQueue = component("transfer-queue-processor")
ComponentVisibilityQueue = component("visibility-queue-processor")
ComponentArchivalQueue = component("archival-queue-processor")
ComponentTimerQueue = component("timer-queue-processor")
ComponentTimerBuilder = component("timer-builder")
ComponentReplicatorQueue = component("replicator-queue-processor")
Expand Down
26 changes: 26 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ type Config struct {
EnableCrossNamespaceCommands dynamicconfig.BoolPropertyFn
EnableActivityEagerExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
NamespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn

// ArchivalQueueProcessor settings
ArchivalProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
ArchivalProcessorSchedulerRoundRobinWeights dynamicconfig.MapPropertyFnWithNamespaceFilter
ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
ArchivalTaskBatchSize dynamicconfig.IntPropertyFn
ArchivalProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorMaxPollRPS dynamicconfig.IntPropertyFn
ArchivalProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
}

const (
Expand Down Expand Up @@ -486,6 +498,20 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
EnableActivityEagerExecution: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityEagerExecution, false),
NamespaceCacheRefreshInterval: dc.GetDurationProperty(dynamicconfig.NamespaceCacheRefreshInterval, 10*time.Second),

// Archival related
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
ArchivalProcessorSchedulerRoundRobinWeights: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.ArchivalProcessorSchedulerRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultActiveTaskPriorityWeight)),
ArchivalProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorMaxPollInterval, 1*time.Minute),
ArchivalProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorMaxPollIntervalJitterCoefficient, 0.15),
ArchivalProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorUpdateAckInterval, 30*time.Second),
ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15),
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
}

return cfg
Expand Down
13 changes: 13 additions & 0 deletions service/history/queues/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const (
OperationTransferActiveQueueProcessor = "TransferActiveQueueProcessor"
OperationTransferStandbyQueueProcessor = "TransferStandbyQueueProcessor"
OperationVisibilityQueueProcessor = "VisibilityQueueProcessor"
OperationArchivalQueueProcessor = "ArchivalQueueProcessor"
)

// Task type tag value for active and standby tasks
Expand All @@ -100,6 +101,7 @@ const (
TaskTypeVisibilityTaskUpsertExecution = "VisibilityTaskUpsertExecution"
TaskTypeVisibilityTaskCloseExecution = "VisibilityTaskCloseExecution"
TaskTypeVisibilityTaskDeleteExecution = "VisibilityTaskDeleteExecution"
TaskTypeArchivalTaskArchiveExecution = "ArchivalTaskArchiveExecution"
TaskTypeTimerActiveTaskActivityTimeout = "TimerActiveTaskActivityTimeout"
TaskTypeTimerActiveTaskWorkflowTaskTimeout = "TimerActiveTaskWorkflowTaskTimeout"
TaskTypeTimerActiveTaskUserTimer = "TimerActiveTaskUserTimer"
Expand Down Expand Up @@ -224,3 +226,14 @@ func GetVisibilityTaskTypeTagValue(
return ""
}
}

func GetArchivalTaskTypeTagValue(
task tasks.Task,
) string {
switch task.(type) {
case *tasks.ArchiveExecutionTask:
return TaskTypeArchivalTaskArchiveExecution
default:
return ""
}
}
38 changes: 38 additions & 0 deletions service/history/queues/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queues

import (
"testing"

"github.com/stretchr/testify/assert"

"go.temporal.io/server/service/history/tasks"
)

func TestGetArchivalTaskTypeTagValue(t *testing.T) {
assert.Equal(t, "ArchivalTaskArchiveExecution", GetArchivalTaskTypeTagValue(&tasks.ArchiveExecutionTask{}))
assert.Equal(t, "", GetArchivalTaskTypeTagValue(&tasks.CloseExecutionTask{}))
}

0 comments on commit d4444cb

Please sign in to comment.