Skip to content

Commit

Permalink
Add an archival queue processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Nov 4, 2022
1 parent 2d76111 commit 859f3d9
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 2 deletions.
111 changes: 111 additions & 0 deletions service/history/archivalQueueFactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package history

import (
"go.uber.org/fx"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

const (
archivalQueuePersistenceMaxRPSRatio = 0.15
)

type (
archivalQueueFactoryParams struct {
fx.In

QueueFactoryBaseParams
}

archivalQueueFactory struct {
*archivalQueueFactoryParams
QueueFactoryBase
archiver archival.Archiver
}
)

func NewArchivalQueueFactory(
params *archivalQueueFactoryParams,
) QueueFactory {
hostScheduler := queues.NewNamespacePriorityScheduler(
params.ClusterMetadata.GetCurrentClusterName(),
queues.NamespacePrioritySchedulerOptions{
WorkerCount: params.Config.ArchivalProcessorSchedulerWorkerCount,
// we don't need standby weights because we only run in the active cluster
ActiveNamespaceWeights: params.Config.ArchivalProcessorSchedulerRoundRobinWeights,
},
params.NamespaceRegistry,
params.TimeSource,
params.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationArchivalQueueProcessor)),
params.Logger,
)
return &archivalQueueFactory{
archivalQueueFactoryParams: params,
QueueFactoryBase: QueueFactoryBase{
HostScheduler: hostScheduler,
HostPriorityAssigner: queues.NewPriorityAssigner(),
HostRateLimiter: NewQueueHostRateLimiter(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter(
NewHostRateLimiterRateFn(
params.Config.ArchivalProcessorMaxPollHostRPS,
params.Config.PersistenceMaxQPS,
archivalQueuePersistenceMaxRPSRatio,
),
params.Config.QueueMaxReaderCount(),
),
},
}
}

func (f *archivalQueueFactory) CreateQueue(
shard shard.Context,
workflowCache workflow.Cache,
) queues.Queue {
logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue)

executor := newArchivalQueueTaskExecutor(f.archiver, shard, workflowCache, f.MetricsHandler, f.Logger)

return queues.NewImmediateQueue(
shard,
tasks.CategoryArchival,
f.HostScheduler,
f.HostPriorityAssigner,
executor,
&queues.Options{
ReaderOptions: queues.ReaderOptions{
BatchSize: f.Config.ArchivalTaskBatchSize,
MaxPendingTasksCount: f.Config.QueuePendingTaskMaxCount,
PollBackoffInterval: f.Config.ArchivalProcessorPollBackoffInterval,
},
MonitorOptions: queues.MonitorOptions{
PendingTasksCriticalCount: f.Config.QueuePendingTaskCriticalCount,
ReaderStuckCriticalAttempts: f.Config.QueueReaderStuckCriticalAttempts,
SliceCountCriticalThreshold: f.Config.QueueCriticalSlicesCount,
},
MaxPollRPS: f.Config.ArchivalProcessorMaxPollRPS,
MaxPollInterval: f.Config.ArchivalProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: f.Config.ArchivalProcessorMaxPollIntervalJitterCoefficient,
CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval,
CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient,
MaxReaderCount: f.Config.QueueMaxReaderCount,
TaskMaxRetryCount: func() int {
// virtually infinite retries
return 99999
},
},
f.HostReaderRateLimiter,
logger,
f.MetricsHandler.WithTags(metrics.OperationTag(queues.OperationArchivalQueueProcessor)),
)
}
5 changes: 5 additions & 0 deletions service/history/archivalQueueProcessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package history

type archivalQueueProcessor struct {
queueProcessorBase
}
139 changes: 139 additions & 0 deletions service/history/archivalQueueTaskExecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package history

import (
"context"
"errors"
"fmt"
"time"

common2 "go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)

type archivalQueueTaskExecutor struct {
archiver archival.Archiver
shardContext shard.Context
workflowCache workflow.Cache
logger log.Logger
metricsClient metrics.MetricsHandler
}

func newArchivalQueueTaskExecutor(archiver archival.Archiver, shardContext shard.Context, workflowCache workflow.Cache,
metricsHandler metrics.MetricsHandler, logger log.Logger) *archivalQueueTaskExecutor {
return &archivalQueueTaskExecutor{
archiver: archiver,
shardContext: shardContext,
workflowCache: workflowCache,
logger: logger,
metricsClient: metricsHandler,
}
}

func (e *archivalQueueTaskExecutor) Execute(ctx context.Context, executable queues.Executable) (tags []metrics.Tag,
isActive bool, err error) {
task := executable.GetTask()
taskType := queues.GetArchivalTaskTypeTagValue(task)
tags = []metrics.Tag{
getNamespaceTagByID(e.shardContext.GetNamespaceRegistry(), task.GetNamespaceID()),
metrics.TaskTypeTag(taskType),
metrics.OperationTag(taskType), // for backward compatibility
}
switch task := task.(type) {
case *tasks.ArchiveExecutionTask:
err = e.processArchiveExecutionTask(ctx, task)
default:
err = fmt.Errorf("task with invalid type sent to archival queue: %+v", task)
}
return tags, true, err
}

func (e *archivalQueueTaskExecutor) processArchiveExecutionTask(ctx context.Context,
task *tasks.ArchiveExecutionTask) (err error) {
weContext, release, err := getWorkflowExecutionContextForTask(ctx, e.workflowCache, task)
if err != nil {
return err
}
defer func() { release(err) }()
// TODO: verify task version?

mutableState, err := weContext.LoadMutableState(ctx)
if err != nil {
return err
}
if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
return errors.New("cannot archive workflow which is running")
}
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
return err
}

namespaceEntry := mutableState.GetNamespaceEntry()
namespaceName := namespaceEntry.Name()
nextEventID := mutableState.GetNextEventID()
closeFailoverVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}

executionInfo := mutableState.GetExecutionInfo()
workflowTypeName := executionInfo.GetWorkflowTypeName()
startTime := executionInfo.GetStartTime()
if startTime == nil {
return errors.New("can't archive workflow with nil start time")
}
executionTime := executionInfo.GetExecutionTime()
if executionTime == nil {
return errors.New("can't archive workflow with nil execution time")
}
closeTime := executionInfo.GetCloseTime()
if closeTime == nil {
return errors.New("can't archive workflow with nil close time")
}
executionState := mutableState.GetExecutionState()
memo := getWorkflowMemo(copyMemo(executionInfo.Memo))
_, err = e.archiver.Archive(ctx, &archival.Request{
ShardID: e.shardContext.GetShardID(),
NamespaceID: task.NamespaceID,
Namespace: namespaceName.String(),
WorkflowID: task.WorkflowID,
RunID: task.RunID,
BranchToken: branchToken,
NextEventID: nextEventID,
CloseFailoverVersion: closeFailoverVersion,
HistoryURI: namespaceEntry.HistoryArchivalState().URI,
WorkflowTypeName: workflowTypeName,
StartTime: *startTime,
ExecutionTime: *executionTime,
CloseTime: *closeTime,
Status: executionState.Status,
HistoryLength: nextEventID - 1,
Memo: memo,
SearchAttributes: getSearchAttributes(copySearchAttributes(executionInfo.SearchAttributes)),
VisibilityURI: namespaceEntry.VisibilityArchivalState().URI,
Targets: []archival.Target{archival.TargetHistory, archival.TargetVisibility},
CallerService: common2.HistoryServiceName,
})
if err != nil {
return err
}
retention := namespaceEntry.Retention()
if retention == 0 {
retention = 7 * 24 * time.Hour
}
deleteTime := closeTime.Add(retention)
mutableState.AddTasks(&tasks.DeleteHistoryEventTask{
WorkflowKey: task.WorkflowKey,
VisibilityTimestamp: deleteTime,
Version: task.Version,
BranchToken: branchToken,
WorkflowDataAlreadyArchived: true,
})
return err
}
Loading

0 comments on commit 859f3d9

Please sign in to comment.