From c221288353f8d58b04120dd1198d80386c8ddc2c Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Wed, 11 Jan 2023 16:25:52 -0800 Subject: [PATCH] Capture panic in replication task processing (#3799) --- service/history/replication/dlq_handler.go | 2 +- .../history/replication/dlq_handler_test.go | 2 +- service/history/replication/task_executor.go | 13 ++----- .../history/replication/task_executor_mock.go | 7 ++-- .../history/replication/task_executor_test.go | 10 ++--- service/history/replication/task_processor.go | 38 +++++++++++++++++-- .../replication/task_processor_test.go | 16 +++++++- 7 files changed, 62 insertions(+), 26 deletions(-) diff --git a/service/history/replication/dlq_handler.go b/service/history/replication/dlq_handler.go index 9c9bbaa0642..eadd24d11b1 100644 --- a/service/history/replication/dlq_handler.go +++ b/service/history/replication/dlq_handler.go @@ -208,7 +208,7 @@ func (r *dlqHandlerImpl) MergeMessages( } for _, task := range replicationTasks { - if _, err := taskExecutor.Execute( + if err := taskExecutor.Execute( ctx, task, true, diff --git a/service/history/replication/dlq_handler_test.go b/service/history/replication/dlq_handler_test.go index 7fc543e8054..18a19f2854e 100644 --- a/service/history/replication/dlq_handler_test.go +++ b/service/history/replication/dlq_handler_test.go @@ -291,7 +291,7 @@ func (s *dlqHandlerSuite) TestMergeMessages() { Return(&adminservice.GetDLQReplicationMessagesResponse{ ReplicationTasks: []*replicationspb.ReplicationTask{remoteTask}, }, nil) - s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return("", nil) + s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return(nil) s.executionManager.EXPECT().RangeDeleteReplicationTaskFromDLQ(gomock.Any(), &persistence.RangeDeleteReplicationTaskFromDLQRequest{ RangeCompleteHistoryTasksRequest: persistence.RangeCompleteHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 988ac59ef08..0785e5bdee0 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -49,7 +49,7 @@ import ( type ( TaskExecutor interface { - Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) (string, error) + Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) error } TaskExecutorParams struct { @@ -105,32 +105,25 @@ func (e *taskExecutorImpl) Execute( ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool, -) (string, error) { +) error { var err error - var operation string switch replicationTask.GetTaskType() { case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK: // Shard status will be sent as part of the Replication message without kafka - operation = metrics.SyncShardTaskScope case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK: - operation = metrics.SyncActivityTaskScope err = e.handleActivityTask(ctx, replicationTask, forceApply) case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK: // Without kafka we should not have size limits so we don't necessary need this in the new replication scheme. - operation = metrics.HistoryMetadataReplicationTaskScope case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK: - operation = metrics.HistoryReplicationTaskScope err = e.handleHistoryReplicationTask(ctx, replicationTask, forceApply) case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK: - operation = metrics.SyncWorkflowStateTaskScope err = e.handleSyncWorkflowStateTask(ctx, replicationTask, forceApply) default: e.logger.Error("Unknown task type.") - operation = metrics.ReplicatorScope err = ErrUnknownReplicationTask } - return operation, err + return err } func (e *taskExecutorImpl) handleActivityTask( diff --git a/service/history/replication/task_executor_mock.go b/service/history/replication/task_executor_mock.go index 34782b70958..66c0910ebd7 100644 --- a/service/history/replication/task_executor_mock.go +++ b/service/history/replication/task_executor_mock.go @@ -60,12 +60,11 @@ func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder { } // Execute mocks base method. -func (m *MockTaskExecutor) Execute(ctx context.Context, replicationTask *repication.ReplicationTask, forceApply bool) (string, error) { +func (m *MockTaskExecutor) Execute(ctx context.Context, replicationTask *repication.ReplicationTask, forceApply bool) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Execute", ctx, replicationTask, forceApply) - ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret0, _ := ret[0].(error) + return ret0 } // Execute indicates an expected call of Execute. diff --git a/service/history/replication/task_executor_test.go b/service/history/replication/task_executor_test.go index cbffe6b98dd..a6845ef09fb 100644 --- a/service/history/replication/task_executor_test.go +++ b/service/history/replication/task_executor_test.go @@ -231,7 +231,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() { } s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -296,7 +296,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese int64(456), ) s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -329,7 +329,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() { } s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -384,7 +384,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() { int64(456), ) s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -405,6 +405,6 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() { s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil) - _, err := s.replicationTaskExecutor.Execute(context.Background(), task, true) + err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 13c3bd553fa..5558e5c1bc9 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -324,14 +324,27 @@ func (p *taskProcessorImpl) handleSyncShardStatus( func (p *taskProcessorImpl) handleReplicationTask( ctx context.Context, replicationTask *replicationspb.ReplicationTask, -) error { +) (retErr error) { _ = p.rateLimiter.Wait(ctx) + operationTagValue := p.getOperationTagValue(replicationTask) + operation := func() error { - operation, err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false) - p.emitTaskMetrics(operation, err) + err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false) + p.emitTaskMetrics(operationTagValue, err) return err } + + var panicErr error + defer func() { + if panicErr != nil { + retErr = panicErr + p.emitTaskMetrics(operationTagValue, panicErr) + } + }() + + defer log.CapturePanic(p.logger, &panicErr) + return backoff.ThrottleRetry(operation, p.taskRetryPolicy, p.isRetryableError) } @@ -526,6 +539,25 @@ func (p *taskProcessorImpl) emitTaskMetrics(operation string, err error) { metricsScope.Counter(metrics.ReplicationTasksFailed.GetMetricName()).Record(1) } +func (p *taskProcessorImpl) getOperationTagValue( + replicationTask *replicationspb.ReplicationTask, +) string { + switch replicationTask.GetTaskType() { + case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK: + return metrics.SyncShardTaskScope + case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK: + return metrics.SyncActivityTaskScope + case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK: + return metrics.HistoryMetadataReplicationTaskScope + case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK: + return metrics.HistoryReplicationTaskScope + case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK: + return metrics.SyncWorkflowStateTaskScope + default: + return metrics.ReplicatorScope + } +} + func (p *taskProcessorImpl) isStopped() bool { return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped } diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index c24a84f3b0e..0298e428939 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -204,7 +204,7 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_SyncActivity() { VisibilityTime: &now, } - s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil) + s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil) err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task) s.NoError(err) } @@ -243,11 +243,23 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_History() { VisibilityTime: &now, } - s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil) + s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil) err = s.replicationTaskProcessor.handleReplicationTask(context.Background(), task) s.NoError(err) } +func (s *taskProcessorSuite) TestHandleReplicationTask_Panic() { + task := &replicationspb.ReplicationTask{} + + s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).DoAndReturn( + func(_ context.Context, _ *replicationspb.ReplicationTask, _ bool) error { + panic("test replication task panic") + }, + ) + err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task) + s.Error(err) +} + func (s *taskProcessorSuite) TestHandleReplicationDLQTask_SyncActivity() { namespaceID := uuid.NewRandom().String() workflowID := uuid.New()