Skip to content

Commit

Permalink
Restructure replication task implementation & UT (#3992)
Browse files Browse the repository at this point in the history
* Restructure replication task implementation & UT for ease of management
  • Loading branch information
wxing1292 authored Mar 7, 2023
1 parent e4162e8 commit dc69278
Show file tree
Hide file tree
Showing 19 changed files with 2,723 additions and 7 deletions.
16 changes: 11 additions & 5 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ const (
AdminRemoveRemoteClusterScope = "AdminRemoveRemoteCluster"
// AdminDeleteWorkflowExecutionScope is the metric scope for admin.AdminDeleteWorkflowExecution
AdminDeleteWorkflowExecutionScope = "AdminDeleteWorkflowExecution"
// AdminStreamReplicationMessagesScope is the metric scope for admin.AdminStreamReplicationMessages
AdminStreamReplicationMessagesScope = "AdminStreamReplicationMessages"

// OperatorAddSearchAttributesScope is the metric scope for operator.AddSearchAttributes
OperatorAddSearchAttributesScope
Expand Down Expand Up @@ -433,6 +435,8 @@ const (
HistoryClientDescribeHistoryHostScope = "HistoryClientDescribeHistoryHost"
// HistoryClientGetReplicationMessagesScope tracks RPC calls to history service
HistoryClientGetReplicationMessagesScope = "HistoryClientGetReplicationMessages"
// HistoryClientStreamReplicationMessagesScope tracks RPC calls to history service
HistoryClientStreamReplicationMessagesScope = "HistoryClientStreamReplicationMessages"
)

// Matching Client Operations
Expand Down Expand Up @@ -1136,12 +1140,14 @@ const (
HistoryMetadataReplicationTaskScope = "HistoryMetadataReplicationTask"
// SyncShardTaskScope is the scope used by sync shrad information processing
SyncShardTaskScope = "SyncShardTask"
// SyncActivityTaskScope is the scope used by sync activity information processing
// SyncActivityTaskScope is the scope used by sync activity
SyncActivityTaskScope = "SyncActivityTask"
// ESProcessorScope is scope used by all metric emitted by esProcessor
ESProcessorScope = "ESProcessor"
// IndexProcessorScope is scope used by all metric emitted by index processor
IndexProcessorScope = "IndexProcessor"
// SyncWorkflowTaskScope is the scope used by sync workflow
SyncWorkflowTaskScope = "SyncWorkflowTask"
// NoopTaskScope is the scope used by noop task
NoopTaskScope = "NoopTask"
// UnknownTaskScope is the scope used by unknown task
UnknownTaskScope = "UnknownTask"
// ParentClosePolicyProcessorScope is scope used by all metrics emitted by worker.ParentClosePolicyProcessor
ParentClosePolicyProcessorScope = "ParentClosePolicyProcessor"
DeleteNamespaceWorkflowScope = "DeleteNamespaceWorkflow"
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ import (
"go.temporal.io/server/service/history/api/verifychildworkflowcompletionrecorded"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
deletemanager "go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/ndc"
"go.temporal.io/server/service/history/queues"
Expand Down
143 changes: 143 additions & 0 deletions service/history/replication/executable_activity_state_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package replication

import (
"time"

"go.temporal.io/api/serviceerror"

"go.temporal.io/server/api/historyservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
)

type (
ExecutableActivityStateTask struct {
ProcessToolBox

definition.WorkflowKey
ExecutableTask
req *historyservice.SyncActivityRequest

// variables to be perhaps removed (not essential to logic)
sourceClusterName string
}
)

var _ ctasks.Task = (*ExecutableActivityStateTask)(nil)
var _ TrackableExecutableTask = (*ExecutableActivityStateTask)(nil)

func NewExecutableActivityStateTask(
processToolBox ProcessToolBox,
taskID int64,
taskCreationTime time.Time,
task *replicationspb.SyncActivityTaskAttributes,
sourceClusterName string,
) *ExecutableActivityStateTask {
return &ExecutableActivityStateTask{
ProcessToolBox: processToolBox,

WorkflowKey: definition.NewWorkflowKey(task.NamespaceId, task.WorkflowId, task.RunId),
ExecutableTask: NewExecutableTask(
processToolBox,
taskID,
metrics.SyncActivityTaskScope,
taskCreationTime,
time.Now().UTC(),
),
req: &historyservice.SyncActivityRequest{
NamespaceId: task.NamespaceId,
WorkflowId: task.WorkflowId,
RunId: task.RunId,
Version: task.Version,
ScheduledEventId: task.ScheduledEventId,
ScheduledTime: task.ScheduledTime,
StartedEventId: task.StartedEventId,
StartedTime: task.StartedTime,
LastHeartbeatTime: task.LastHeartbeatTime,
Details: task.Details,
Attempt: task.Attempt,
LastFailure: task.LastFailure,
LastWorkerIdentity: task.LastWorkerIdentity,
VersionHistory: task.GetVersionHistory(),
},

sourceClusterName: sourceClusterName,
}
}

func (e *ExecutableActivityStateTask) Execute() error {
namespaceName, apply, nsError := e.GetNamespaceInfo(e.NamespaceID)
if nsError != nil {
return nsError
} else if !apply {
return nil
}
ctx, cancel := newTaskContext(namespaceName)
defer cancel()

shardContext, err := e.ShardController.GetShardByNamespaceWorkflow(
namespace.ID(e.NamespaceID),
e.WorkflowID,
)
if err != nil {
return err
}
engine, err := shardContext.GetEngine(ctx)
if err != nil {
return err
}
return engine.SyncActivity(ctx, e.req)
}

func (e *ExecutableActivityStateTask) HandleErr(err error) error {
switch retryErr := err.(type) {
case nil, *serviceerror.NotFound:
return nil
case *serviceerrors.RetryReplication:
namespaceName, _, nsError := e.GetNamespaceInfo(e.NamespaceID)
if nsError != nil {
return err
}
ctx, cancel := newTaskContext(namespaceName)
defer cancel()

if resendErr := e.Resend(
ctx,
e.sourceClusterName,
retryErr,
); resendErr != nil {
return err
}
return e.Execute()
default:
return err
}
}
Loading

0 comments on commit dc69278

Please sign in to comment.