Skip to content

Commit

Permalink
Add poison pill support for replication stream (#4116)
Browse files Browse the repository at this point in the history
* Add poison pill support for replication stream & UT
  • Loading branch information
wxing1292 committed Mar 31, 2023
1 parent 81ae1c3 commit 0b86bc8
Show file tree
Hide file tree
Showing 17 changed files with 393 additions and 27 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,8 @@ const (
ReplicationTaskProcessorShardQPS = "history.ReplicationTaskProcessorShardQPS"
// ReplicationBypassCorruptedData is the flag to bypass corrupted workflow data in source cluster
ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData"
// ReplicationProcessorSchedulerQueueSize is the replication task executor queue size
ReplicationProcessorSchedulerQueueSize = "history.ReplicationProcessorSchedulerQueueSize"
// ReplicationProcessorSchedulerWorkerCount is the replication task executor worker count
ReplicationProcessorSchedulerWorkerCount = "history.ReplicationProcessorSchedulerWorkerCount"

Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ type Config struct {
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn
ReplicationBypassCorruptedData dynamicconfig.BoolPropertyFnWithNamespaceIDFilter
ReplicationProcessorSchedulerQueueSize dynamicconfig.IntPropertyFn
ReplicationProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn

// The following are used by consistent query
Expand Down Expand Up @@ -403,6 +404,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500),
ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 30),
ReplicationBypassCorruptedData: dc.GetBoolPropertyFnWithNamespaceIDFilter(dynamicconfig.ReplicationBypassCorruptedData, false),
ReplicationProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerQueueSize, 128),
ReplicationProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerWorkerCount, 512),

MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
Expand Down
42 changes: 42 additions & 0 deletions service/history/replication/executable_activity_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ import (

"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
)
Expand Down Expand Up @@ -142,3 +146,41 @@ func (e *ExecutableActivityStateTask) HandleErr(err error) error {
return err
}
}

func (e *ExecutableActivityStateTask) MarkPoisonPill() error {
shardContext, err := e.ShardController.GetShardByNamespaceWorkflow(
namespace.ID(e.NamespaceID),
e.WorkflowID,
)
if err != nil {
return err
}

// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
req := &persistence.PutReplicationTaskToDLQRequest{
ShardID: shardContext.GetShardID(),
SourceClusterName: e.sourceClusterName,
TaskInfo: &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_ACTIVITY,
ScheduledEventId: e.req.ScheduledEventId,
Version: e.req.Version,
},
}

e.Logger.Error("enqueue activity state replication task to DLQ",
tag.ShardID(shardContext.GetShardID()),
tag.WorkflowNamespaceID(e.NamespaceID),
tag.WorkflowID(e.WorkflowID),
tag.WorkflowRunID(e.RunID),
tag.TaskID(e.ExecutableTask.TaskID()),
)

ctx, cancel := newTaskContext(e.NamespaceID)
defer cancel()

return shardContext.GetExecutionManager().PutReplicationTaskToDLQ(ctx, req)
}
39 changes: 36 additions & 3 deletions service/history/replication/executable_activity_state_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ import (
failurepb "go.temporal.io/api/failure/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/xdc"
Expand All @@ -71,7 +74,8 @@ type (
replicationTask *replicationspb.SyncActivityTaskAttributes
sourceClusterName string

task *ExecutableActivityStateTask
taskID int64
task *ExecutableActivityStateTask
}
)

Expand Down Expand Up @@ -116,7 +120,7 @@ func (s *executableActivityStateTaskSuite) SetupTest() {
VersionHistory: &history.VersionHistory{},
}
s.sourceClusterName = cluster.TestCurrentClusterName

s.taskID = rand.Int63()
s.task = NewExecutableActivityStateTask(
ProcessToolBox{
ClusterMetadata: s.clusterMetadata,
Expand All @@ -127,12 +131,13 @@ func (s *executableActivityStateTaskSuite) SetupTest() {
MetricsHandler: s.metricsHandler,
Logger: s.logger,
},
rand.Int63(),
s.taskID,
time.Unix(0, rand.Int63()),
s.replicationTask,
s.sourceClusterName,
)
s.task.ExecutableTask = s.executableTask
s.executableTask.EXPECT().TaskID().Return(s.taskID).AnyTimes()
}

func (s *executableActivityStateTaskSuite) TearDownTest() {
Expand Down Expand Up @@ -264,3 +269,31 @@ func (s *executableActivityStateTaskSuite) TestHandleErr_Other() {
err = serviceerror.NewUnavailable("")
s.Equal(err, s.task.HandleErr(err))
}

func (s *executableActivityStateTaskSuite) TestMarkPoisonPill() {
shardID := rand.Int31()
shardContext := shard.NewMockContext(s.controller)
executionManager := persistence.NewMockExecutionManager(s.controller)
s.shardController.EXPECT().GetShardByNamespaceWorkflow(
namespace.ID(s.task.NamespaceID),
s.task.WorkflowID,
).Return(shardContext, nil).AnyTimes()
shardContext.EXPECT().GetShardID().Return(shardID).AnyTimes()
shardContext.EXPECT().GetExecutionManager().Return(executionManager).AnyTimes()
executionManager.EXPECT().PutReplicationTaskToDLQ(gomock.Any(), &persistence.PutReplicationTaskToDLQRequest{
ShardID: shardID,
SourceClusterName: s.sourceClusterName,
TaskInfo: &persistencespb.ReplicationTaskInfo{
NamespaceId: s.task.NamespaceID,
WorkflowId: s.task.WorkflowID,
RunId: s.task.RunID,
TaskId: s.task.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_SYNC_ACTIVITY,
ScheduledEventId: s.task.req.ScheduledEventId,
Version: s.task.req.Version,
},
}).Return(nil)

err := s.task.MarkPoisonPill()
s.NoError(err)
}
66 changes: 66 additions & 0 deletions service/history/replication/executable_history_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
serviceerrors "go.temporal.io/server/common/serviceerror"
ctasks "go.temporal.io/server/common/tasks"
)
Expand Down Expand Up @@ -138,3 +143,64 @@ func (e *ExecutableHistoryTask) HandleErr(err error) error {
return err
}
}

func (e *ExecutableHistoryTask) MarkPoisonPill() error {
shardContext, err := e.ShardController.GetShardByNamespaceWorkflow(
namespace.ID(e.NamespaceID),
e.WorkflowID,
)
if err != nil {
return err
}

events, err := serialization.NewSerializer().DeserializeEvents(e.req.Events)
if err != nil {
e.Logger.Error("unable to enqueue history replication task to DLQ, ser/de error",
tag.ShardID(shardContext.GetShardID()),
tag.WorkflowNamespaceID(e.NamespaceID),
tag.WorkflowID(e.WorkflowID),
tag.WorkflowRunID(e.RunID),
tag.TaskID(e.ExecutableTask.TaskID()),
tag.Error(err),
)
return nil
} else if len(events) == 0 {
e.Logger.Error("unable to enqueue history replication task to DLQ, no events",
tag.ShardID(shardContext.GetShardID()),
tag.WorkflowNamespaceID(e.NamespaceID),
tag.WorkflowID(e.WorkflowID),
tag.WorkflowRunID(e.RunID),
tag.TaskID(e.ExecutableTask.TaskID()),
)
return nil
}

// TODO: GetShardID will break GetDLQReplicationMessages we need to handle DLQ for cross shard replication.
req := &persistence.PutReplicationTaskToDLQRequest{
ShardID: shardContext.GetShardID(),
SourceClusterName: e.sourceClusterName,
TaskInfo: &persistencespb.ReplicationTaskInfo{
NamespaceId: e.NamespaceID,
WorkflowId: e.WorkflowID,
RunId: e.RunID,
TaskId: e.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_HISTORY,
FirstEventId: events[0].GetEventId(),
NextEventId: events[len(events)-1].GetEventId() + 1,
Version: events[0].GetVersion(),
},
}

e.Logger.Error("enqueue history replication task to DLQ",
tag.ShardID(shardContext.GetShardID()),
tag.WorkflowNamespaceID(e.NamespaceID),
tag.WorkflowID(e.WorkflowID),
tag.WorkflowRunID(e.RunID),
tag.TaskID(e.ExecutableTask.TaskID()),
)

ctx, cancel := newTaskContext(e.NamespaceID)
defer cancel()

return shardContext.GetExecutionManager().PutReplicationTaskToDLQ(ctx, req)
}
73 changes: 64 additions & 9 deletions service/history/replication/executable_history_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,23 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/client"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/shard"
Expand All @@ -69,7 +75,8 @@ type (
replicationTask *replicationspb.HistoryTaskAttributes
sourceClusterName string

task *ExecutableHistoryTask
taskID int64
task *ExecutableHistoryTask
}
)

Expand All @@ -96,17 +103,33 @@ func (s *executableHistoryTaskSuite) SetupTest() {
s.metricsHandler = metrics.NoopMetricsHandler
s.logger = log.NewNoopLogger()
s.executableTask = NewMockExecutableTask(s.controller)

firstEventID := rand.Int63()
nextEventID := firstEventID + 1
version := rand.Int63()
events, _ := serialization.NewSerializer().SerializeEvents([]*historypb.HistoryEvent{{
EventId: firstEventID,
Version: version,
}}, enumspb.ENCODING_TYPE_PROTO3)
newEvents, _ := serialization.NewSerializer().SerializeEvents([]*historypb.HistoryEvent{{
EventId: 1,
Version: version,
}}, enumspb.ENCODING_TYPE_PROTO3)
s.replicationTask = &replicationspb.HistoryTaskAttributes{
NamespaceId: uuid.NewString(),
WorkflowId: uuid.NewString(),
RunId: uuid.NewString(),
BaseExecutionInfo: &workflowspb.BaseExecutionInfo{},
VersionHistoryItems: []*history.VersionHistoryItem{},
Events: &commonpb.DataBlob{},
NewRunEvents: &commonpb.DataBlob{},
NamespaceId: uuid.NewString(),
WorkflowId: uuid.NewString(),
RunId: uuid.NewString(),
BaseExecutionInfo: &workflowspb.BaseExecutionInfo{},
VersionHistoryItems: []*history.VersionHistoryItem{{
EventId: nextEventID - 1,
Version: version,
}},
Events: events,
NewRunEvents: newEvents,
}
s.sourceClusterName = cluster.TestCurrentClusterName

s.taskID = rand.Int63()
s.task = NewExecutableHistoryTask(
ProcessToolBox{
ClusterMetadata: s.clusterMetadata,
Expand All @@ -117,12 +140,13 @@ func (s *executableHistoryTaskSuite) SetupTest() {
MetricsHandler: s.metricsHandler,
Logger: s.logger,
},
rand.Int63(),
s.taskID,
time.Unix(0, rand.Int63()),
s.replicationTask,
s.sourceClusterName,
)
s.task.ExecutableTask = s.executableTask
s.executableTask.EXPECT().TaskID().Return(s.taskID).AnyTimes()
}

func (s *executableHistoryTaskSuite) TearDownTest() {
Expand Down Expand Up @@ -242,3 +266,34 @@ func (s *executableHistoryTaskSuite) TestHandleErr_Other() {
err = serviceerror.NewUnavailable("")
s.Equal(err, s.task.HandleErr(err))
}

func (s *executableHistoryTaskSuite) TestMarkPoisonPill() {
events, _ := serialization.NewSerializer().DeserializeEvents(s.task.req.Events)

shardID := rand.Int31()
shardContext := shard.NewMockContext(s.controller)
executionManager := persistence.NewMockExecutionManager(s.controller)
s.shardController.EXPECT().GetShardByNamespaceWorkflow(
namespace.ID(s.task.NamespaceID),
s.task.WorkflowID,
).Return(shardContext, nil).AnyTimes()
shardContext.EXPECT().GetShardID().Return(shardID).AnyTimes()
shardContext.EXPECT().GetExecutionManager().Return(executionManager).AnyTimes()
executionManager.EXPECT().PutReplicationTaskToDLQ(gomock.Any(), &persistence.PutReplicationTaskToDLQRequest{
ShardID: shardID,
SourceClusterName: s.sourceClusterName,
TaskInfo: &persistencespb.ReplicationTaskInfo{
NamespaceId: s.task.NamespaceID,
WorkflowId: s.task.WorkflowID,
RunId: s.task.RunID,
TaskId: s.task.ExecutableTask.TaskID(),
TaskType: enumsspb.TASK_TYPE_REPLICATION_HISTORY,
FirstEventId: events[0].GetEventId(),
NextEventId: events[len(events)-1].GetEventId() + 1,
Version: events[0].GetVersion(),
},
}).Return(nil)

err := s.task.MarkPoisonPill()
s.NoError(err)
}
4 changes: 4 additions & 0 deletions service/history/replication/executable_noop_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ func (e *ExecutableNoopTask) Execute() error {
func (e *ExecutableNoopTask) HandleErr(err error) error {
return err
}

func (e *ExecutableNoopTask) MarkPoisonPill() error {
return nil
}
5 changes: 5 additions & 0 deletions service/history/replication/executable_noop_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ func (s *executableNoopTaskSuite) TestHandleErr() {
err = serviceerror.NewUnavailable("")
s.Equal(err, s.task.HandleErr(err))
}

func (s *executableNoopTaskSuite) TestMarkPoisonPill() {
err := s.task.MarkPoisonPill()
s.NoError(err)
}
Loading

0 comments on commit 0b86bc8

Please sign in to comment.