diff --git a/common/persistence/tests/cassandra_test_util.go b/common/persistence/tests/cassandra_test_util.go index 1c840626e8b..28f616dc123 100644 --- a/common/persistence/tests/cassandra_test_util.go +++ b/common/persistence/tests/cassandra_test_util.go @@ -32,6 +32,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/blang/semver/v4" "github.com/gocql/gocql" @@ -241,10 +242,11 @@ func GetSchemaFiles(schemaDir string, logger log.Logger) []string { // NewCassandraConfig returns a new Cassandra config for test func NewCassandraConfig() *config.Cassandra { return &config.Cassandra{ - User: testCassandraUser, - Password: testCassandraPassword, - Hosts: environment.GetCassandraAddress(), - Port: environment.GetCassandraPort(), - Keyspace: testCassandraDatabaseNamePrefix + shuffle.String(testCassandraDatabaseNameSuffix), + User: testCassandraUser, + Password: testCassandraPassword, + Hosts: environment.GetCassandraAddress(), + Port: environment.GetCassandraPort(), + Keyspace: testCassandraDatabaseNamePrefix + shuffle.String(testCassandraDatabaseNameSuffix), + ConnectTimeout: 30 * time.Second, } } diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index e8dc3e6aaa0..8fc3d58f6d9 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -333,7 +333,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( return err } - lastEventTime, lastFirstTxnID, err := r.backfillHistory( + _, lastFirstTxnID, err := r.backfillHistory( ctx, request.GetRemoteCluster(), namespaceID, @@ -377,7 +377,6 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState( } return r.transactionMgr.createWorkflow( ctx, - timestamp.TimeValue(lastEventTime), NewWorkflow( ctx, r.namespaceRegistry, @@ -499,7 +498,6 @@ func (r *HistoryReplicatorImpl) applyStartEvents( err = r.transactionMgr.createWorkflow( ctx, - task.getEventTime(), NewWorkflow( ctx, r.namespaceRegistry, @@ -637,7 +635,6 @@ func (r *HistoryReplicatorImpl) applyNonStartEventsToCurrentBranch( err = r.transactionMgr.updateWorkflow( ctx, - task.getEventTime(), isRebuilt, targetWorkflow, newWorkflow, @@ -883,7 +880,6 @@ func (r *HistoryReplicatorImpl) applyNonStartEventsResetWorkflow( err = r.transactionMgr.createWorkflow( ctx, - task.getEventTime(), targetWorkflow, ) if err != nil { diff --git a/service/history/ndc/transaction_manager.go b/service/history/ndc/transaction_manager.go index 0d9fec98ea2..663cdb8d3ee 100644 --- a/service/history/ndc/transaction_manager.go +++ b/service/history/ndc/transaction_manager.go @@ -28,7 +28,6 @@ package ndc import ( "context" - "time" "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" @@ -117,12 +116,10 @@ type ( transactionMgr interface { createWorkflow( ctx context.Context, - now time.Time, targetWorkflow Workflow, ) error updateWorkflow( ctx context.Context, - now time.Time, isWorkflowRebuilt bool, targetWorkflow Workflow, newWorkflow Workflow, @@ -204,20 +201,17 @@ func newTransactionMgr( func (r *transactionMgrImpl) createWorkflow( ctx context.Context, - now time.Time, targetWorkflow Workflow, ) error { return r.createMgr.dispatchForNewWorkflow( ctx, - now, targetWorkflow, ) } func (r *transactionMgrImpl) updateWorkflow( ctx context.Context, - now time.Time, isWorkflowRebuilt bool, targetWorkflow Workflow, newWorkflow Workflow, @@ -225,7 +219,6 @@ func (r *transactionMgrImpl) updateWorkflow( return r.updateMgr.dispatchForExistingWorkflow( ctx, - now, isWorkflowRebuilt, targetWorkflow, newWorkflow, diff --git a/service/history/ndc/transaction_manager_existing_workflow.go b/service/history/ndc/transaction_manager_existing_workflow.go index ec96a68d0ac..e549c31e720 100644 --- a/service/history/ndc/transaction_manager_existing_workflow.go +++ b/service/history/ndc/transaction_manager_existing_workflow.go @@ -29,7 +29,6 @@ package ndc import ( "context" "fmt" - "time" "go.temporal.io/api/serviceerror" @@ -42,7 +41,6 @@ type ( transactionMgrForExistingWorkflow interface { dispatchForExistingWorkflow( ctx context.Context, - now time.Time, isWorkflowRebuilt bool, targetWorkflow Workflow, newWorkflow Workflow, @@ -67,7 +65,6 @@ func newNDCTransactionMgrForExistingWorkflow( func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow( ctx context.Context, - now time.Time, isWorkflowRebuilt bool, targetWorkflow Workflow, newWorkflow Workflow, @@ -84,7 +81,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow( // update to current record, since target workflow is pointed by current record return r.dispatchWorkflowUpdateAsCurrent( ctx, - now, isWorkflowRebuilt, targetWorkflow, newWorkflow, @@ -120,7 +116,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow( // update to current record, since target workflow is pointed by current record return r.dispatchWorkflowUpdateAsCurrent( ctx, - now, isWorkflowRebuilt, targetWorkflow, newWorkflow, @@ -147,7 +142,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow( // target workflow is older than current workflow, need to suppress the target workflow return r.dispatchWorkflowUpdateAsZombie( ctx, - now, isWorkflowRebuilt, currentWorkflow, targetWorkflow, @@ -159,7 +153,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow( // will set target workflow using snapshot return r.executeTransaction( ctx, - now, nDCTransactionPolicySuppressCurrentAndUpdateAsCurrent, currentWorkflow, targetWorkflow, @@ -169,7 +162,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow( func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurrent( ctx context.Context, - now time.Time, isWorkflowRebuilt bool, targetWorkflow Workflow, newWorkflow Workflow, @@ -178,7 +170,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurre if !isWorkflowRebuilt { return r.executeTransaction( ctx, - now, nDCTransactionPolicyUpdateAsCurrent, nil, targetWorkflow, @@ -188,7 +179,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurre return r.executeTransaction( ctx, - now, nDCTransactionPolicyConflictResolveAsCurrent, nil, targetWorkflow, @@ -198,7 +188,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurre func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombie( ctx context.Context, - now time.Time, isWorkflowRebuilt bool, currentWorkflow Workflow, targetWorkflow Workflow, @@ -208,7 +197,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombi if !isWorkflowRebuilt { return r.executeTransaction( ctx, - now, nDCTransactionPolicyUpdateAsZombie, currentWorkflow, targetWorkflow, @@ -218,7 +206,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombi return r.executeTransaction( ctx, - now, nDCTransactionPolicyConflictResolveAsZombie, currentWorkflow, targetWorkflow, @@ -228,7 +215,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombi func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsCurrent( ctx context.Context, - now time.Time, targetWorkflow Workflow, newWorkflow Workflow, ) error { @@ -246,7 +232,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsCurrent( func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsZombie( ctx context.Context, - now time.Time, currentWorkflow Workflow, targetWorkflow Workflow, newWorkflow Workflow, @@ -319,7 +304,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsZombie( func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCurrent( ctx context.Context, - now time.Time, currentWorkflow Workflow, targetWorkflow Workflow, newWorkflow Workflow, @@ -364,7 +348,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCur func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsCurrent( ctx context.Context, - now time.Time, targetWorkflow Workflow, newWorkflow Workflow, ) error { @@ -390,7 +373,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsCurrent( func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsZombie( ctx context.Context, - now time.Time, currentWorkflow Workflow, targetWorkflow Workflow, newWorkflow Workflow, @@ -462,7 +444,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsZombie( func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction( ctx context.Context, - now time.Time, transactionPolicy nDCTransactionPolicy, currentWorkflow Workflow, targetWorkflow Workflow, @@ -482,7 +463,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction( case nDCTransactionPolicyUpdateAsCurrent: return r.updateAsCurrent( ctx, - now, targetWorkflow, newWorkflow, ) @@ -490,7 +470,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction( case nDCTransactionPolicyUpdateAsZombie: return r.updateAsZombie( ctx, - now, currentWorkflow, targetWorkflow, newWorkflow, @@ -499,7 +478,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction( case nDCTransactionPolicySuppressCurrentAndUpdateAsCurrent: return r.suppressCurrentAndUpdateAsCurrent( ctx, - now, currentWorkflow, targetWorkflow, newWorkflow, @@ -508,7 +486,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction( case nDCTransactionPolicyConflictResolveAsCurrent: return r.conflictResolveAsCurrent( ctx, - now, targetWorkflow, newWorkflow, ) @@ -516,7 +493,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction( case nDCTransactionPolicyConflictResolveAsZombie: return r.conflictResolveAsZombie( ctx, - now, currentWorkflow, targetWorkflow, newWorkflow, diff --git a/service/history/ndc/transaction_manager_existing_workflow_mock.go b/service/history/ndc/transaction_manager_existing_workflow_mock.go index 6ceb322db2e..9b33ececb2e 100644 --- a/service/history/ndc/transaction_manager_existing_workflow_mock.go +++ b/service/history/ndc/transaction_manager_existing_workflow_mock.go @@ -31,7 +31,6 @@ package ndc import ( context "context" reflect "reflect" - time "time" gomock "github.com/golang/mock/gomock" ) @@ -60,15 +59,15 @@ func (m *MocktransactionMgrForExistingWorkflow) EXPECT() *MocktransactionMgrForE } // dispatchForExistingWorkflow mocks base method. -func (m *MocktransactionMgrForExistingWorkflow) dispatchForExistingWorkflow(ctx context.Context, now time.Time, isWorkflowRebuilt bool, targetWorkflow, newWorkflow Workflow) error { +func (m *MocktransactionMgrForExistingWorkflow) dispatchForExistingWorkflow(ctx context.Context, isWorkflowRebuilt bool, targetWorkflow, newWorkflow Workflow) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "dispatchForExistingWorkflow", ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + ret := m.ctrl.Call(m, "dispatchForExistingWorkflow", ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) ret0, _ := ret[0].(error) return ret0 } // dispatchForExistingWorkflow indicates an expected call of dispatchForExistingWorkflow. -func (mr *MocktransactionMgrForExistingWorkflowMockRecorder) dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow interface{}) *gomock.Call { +func (mr *MocktransactionMgrForExistingWorkflowMockRecorder) dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "dispatchForExistingWorkflow", reflect.TypeOf((*MocktransactionMgrForExistingWorkflow)(nil).dispatchForExistingWorkflow), ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "dispatchForExistingWorkflow", reflect.TypeOf((*MocktransactionMgrForExistingWorkflow)(nil).dispatchForExistingWorkflow), ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) } diff --git a/service/history/ndc/transaction_manager_existing_workflow_test.go b/service/history/ndc/transaction_manager_existing_workflow_test.go index 22da85d131d..6fb1bc794fc 100644 --- a/service/history/ndc/transaction_manager_existing_workflow_test.go +++ b/service/history/ndc/transaction_manager_existing_workflow_test.go @@ -27,7 +27,6 @@ package ndc import ( "context" "testing" - "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -72,7 +71,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TearDownTest() { func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_NoRebuild_CurrentWorkflowGuaranteed() { ctx := context.Background() - now := time.Now().UTC() isWorkflowRebuilt := false @@ -103,7 +101,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow newMutableState, ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -111,7 +109,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_NoRebuild_CurrentWorkflowNotGuaranteed_IsCurrent() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -135,13 +132,12 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow }).AnyTimes() s.mockTransactionMgr.EXPECT().getCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(targetRunID, nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.Error(err) } func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_NoRebuild_CurrentWorkflowNotGuaranteed_NotCurrent_CurrentRunning_UpdateAsCurrent() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -207,7 +203,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow currentWorkflowPolicy.Ptr(), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -216,7 +212,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_NoRebuild_CurrentWorkflowNotGuaranteed_NotCurrent_CurrentComplete_UpdateAsCurrent() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -282,7 +277,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow currentWorkflowPolicy.Ptr(), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -291,7 +286,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_NoRebuild_CurrentWorkflowNotGuaranteed_NotCurrent_UpdateAsZombie_NewRunDoesNotExists() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -357,7 +351,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow workflow.TransactionPolicyPassive.Ptr(), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -366,7 +360,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_NoRebuild_CurrentWorkflowNotGuaranteed_NotCurrent_UpdateAsZombie_NewRunDoesExists() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -432,7 +425,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow (*workflow.TransactionPolicy)(nil), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -441,7 +434,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_Rebuild_IsCurrent() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -488,7 +480,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow (*workflow.TransactionPolicy)(nil), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -496,7 +488,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_Rebuild_NotCurrent_UpdateAsCurrent() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -561,7 +552,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow currentWorkflowPolicy.Ptr(), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -570,7 +561,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_Rebuild_NotCurrent_UpdateAsZombie_NewRunDoesNotExists() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -637,7 +627,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow (*workflow.TransactionPolicy)(nil), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) @@ -646,7 +636,6 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow_Rebuild_NotCurrent_UpdateAsZombie_NewRunDoesExists() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -713,7 +702,7 @@ func (s *transactionMgrForExistingWorkflowSuite) TestDispatchForExistingWorkflow (*workflow.TransactionPolicy)(nil), ).Return(nil) - err := s.updateMgr.dispatchForExistingWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.updateMgr.dispatchForExistingWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(newReleaseCalled) diff --git a/service/history/ndc/transaction_manager_mock.go b/service/history/ndc/transaction_manager_mock.go index dfd2475f827..5c8daa285dd 100644 --- a/service/history/ndc/transaction_manager_mock.go +++ b/service/history/ndc/transaction_manager_mock.go @@ -31,7 +31,6 @@ package ndc import ( context "context" reflect "reflect" - time "time" gomock "github.com/golang/mock/gomock" namespace "go.temporal.io/server/common/namespace" @@ -96,17 +95,17 @@ func (mr *MocktransactionMgrMockRecorder) checkWorkflowExists(ctx, namespaceID, } // createWorkflow mocks base method. -func (m *MocktransactionMgr) createWorkflow(ctx context.Context, now time.Time, targetWorkflow Workflow) error { +func (m *MocktransactionMgr) createWorkflow(ctx context.Context, targetWorkflow Workflow) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "createWorkflow", ctx, now, targetWorkflow) + ret := m.ctrl.Call(m, "createWorkflow", ctx, targetWorkflow) ret0, _ := ret[0].(error) return ret0 } // createWorkflow indicates an expected call of createWorkflow. -func (mr *MocktransactionMgrMockRecorder) createWorkflow(ctx, now, targetWorkflow interface{}) *gomock.Call { +func (mr *MocktransactionMgrMockRecorder) createWorkflow(ctx, targetWorkflow interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "createWorkflow", reflect.TypeOf((*MocktransactionMgr)(nil).createWorkflow), ctx, now, targetWorkflow) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "createWorkflow", reflect.TypeOf((*MocktransactionMgr)(nil).createWorkflow), ctx, targetWorkflow) } // getCurrentWorkflowRunID mocks base method. @@ -140,15 +139,15 @@ func (mr *MocktransactionMgrMockRecorder) loadWorkflow(ctx, namespaceID, workflo } // updateWorkflow mocks base method. -func (m *MocktransactionMgr) updateWorkflow(ctx context.Context, now time.Time, isWorkflowRebuilt bool, targetWorkflow, newWorkflow Workflow) error { +func (m *MocktransactionMgr) updateWorkflow(ctx context.Context, isWorkflowRebuilt bool, targetWorkflow, newWorkflow Workflow) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "updateWorkflow", ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + ret := m.ctrl.Call(m, "updateWorkflow", ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) ret0, _ := ret[0].(error) return ret0 } // updateWorkflow indicates an expected call of updateWorkflow. -func (mr *MocktransactionMgrMockRecorder) updateWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow interface{}) *gomock.Call { +func (mr *MocktransactionMgrMockRecorder) updateWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "updateWorkflow", reflect.TypeOf((*MocktransactionMgr)(nil).updateWorkflow), ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "updateWorkflow", reflect.TypeOf((*MocktransactionMgr)(nil).updateWorkflow), ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) } diff --git a/service/history/ndc/transaction_manager_new_workflow.go b/service/history/ndc/transaction_manager_new_workflow.go index d65d82622ca..8b5d25d7fd2 100644 --- a/service/history/ndc/transaction_manager_new_workflow.go +++ b/service/history/ndc/transaction_manager_new_workflow.go @@ -29,7 +29,6 @@ package ndc import ( "context" "fmt" - "time" "go.temporal.io/api/serviceerror" @@ -42,7 +41,6 @@ type ( transactionMgrForNewWorkflow interface { dispatchForNewWorkflow( ctx context.Context, - now time.Time, targetWorkflow Workflow, ) error } @@ -65,9 +63,9 @@ func newTransactionMgrForNewWorkflow( func (r *nDCTransactionMgrForNewWorkflowImpl) dispatchForNewWorkflow( ctx context.Context, - now time.Time, targetWorkflow Workflow, ) error { + // NOTE: this function does NOT mutate current workflow or target workflow, // workflow mutation is done in methods within executeTransaction function @@ -92,7 +90,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) dispatchForNewWorkflow( // current record does not exists return r.executeTransaction( ctx, - now, nDCTransactionPolicyCreateAsCurrent, nil, targetWorkflow, @@ -119,7 +116,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) dispatchForNewWorkflow( // target workflow is older than current workflow, need to suppress the target workflow return r.executeTransaction( ctx, - now, nDCTransactionPolicyCreateAsZombie, currentWorkflow, targetWorkflow, @@ -132,7 +128,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) dispatchForNewWorkflow( // proceed to create workflow return r.executeTransaction( ctx, - now, nDCTransactionPolicyCreateAsCurrent, currentWorkflow, targetWorkflow, @@ -142,7 +137,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) dispatchForNewWorkflow( // current workflow is still running, need to suppress the current workflow return r.executeTransaction( ctx, - now, nDCTransactionPolicySuppressCurrentAndCreateAsCurrent, currentWorkflow, targetWorkflow, @@ -151,7 +145,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) dispatchForNewWorkflow( func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent( ctx context.Context, - now time.Time, currentWorkflow Workflow, targetWorkflow Workflow, ) error { @@ -200,7 +193,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsCurrent( func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( ctx context.Context, - now time.Time, currentWorkflow Workflow, targetWorkflow Workflow, ) error { @@ -260,7 +252,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie( func (r *nDCTransactionMgrForNewWorkflowImpl) suppressCurrentAndCreateAsCurrent( ctx context.Context, - now time.Time, currentWorkflow Workflow, targetWorkflow Workflow, ) error { @@ -287,7 +278,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) suppressCurrentAndCreateAsCurrent( func (r *nDCTransactionMgrForNewWorkflowImpl) executeTransaction( ctx context.Context, - now time.Time, transactionPolicy nDCTransactionPolicy, currentWorkflow Workflow, targetWorkflow Workflow, @@ -306,7 +296,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) executeTransaction( case nDCTransactionPolicyCreateAsCurrent: return r.createAsCurrent( ctx, - now, currentWorkflow, targetWorkflow, ) @@ -314,7 +303,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) executeTransaction( case nDCTransactionPolicyCreateAsZombie: return r.createAsZombie( ctx, - now, currentWorkflow, targetWorkflow, ) @@ -322,7 +310,6 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) executeTransaction( case nDCTransactionPolicySuppressCurrentAndCreateAsCurrent: return r.suppressCurrentAndCreateAsCurrent( ctx, - now, currentWorkflow, targetWorkflow, ) diff --git a/service/history/ndc/transaction_manager_new_workflow_mock.go b/service/history/ndc/transaction_manager_new_workflow_mock.go index d1b1e2cabcf..a79f62d1657 100644 --- a/service/history/ndc/transaction_manager_new_workflow_mock.go +++ b/service/history/ndc/transaction_manager_new_workflow_mock.go @@ -31,7 +31,6 @@ package ndc import ( context "context" reflect "reflect" - time "time" gomock "github.com/golang/mock/gomock" ) @@ -60,15 +59,15 @@ func (m *MocktransactionMgrForNewWorkflow) EXPECT() *MocktransactionMgrForNewWor } // dispatchForNewWorkflow mocks base method. -func (m *MocktransactionMgrForNewWorkflow) dispatchForNewWorkflow(ctx context.Context, now time.Time, targetWorkflow Workflow) error { +func (m *MocktransactionMgrForNewWorkflow) dispatchForNewWorkflow(ctx context.Context, targetWorkflow Workflow) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "dispatchForNewWorkflow", ctx, now, targetWorkflow) + ret := m.ctrl.Call(m, "dispatchForNewWorkflow", ctx, targetWorkflow) ret0, _ := ret[0].(error) return ret0 } // dispatchForNewWorkflow indicates an expected call of dispatchForNewWorkflow. -func (mr *MocktransactionMgrForNewWorkflowMockRecorder) dispatchForNewWorkflow(ctx, now, targetWorkflow interface{}) *gomock.Call { +func (mr *MocktransactionMgrForNewWorkflowMockRecorder) dispatchForNewWorkflow(ctx, targetWorkflow interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "dispatchForNewWorkflow", reflect.TypeOf((*MocktransactionMgrForNewWorkflow)(nil).dispatchForNewWorkflow), ctx, now, targetWorkflow) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "dispatchForNewWorkflow", reflect.TypeOf((*MocktransactionMgrForNewWorkflow)(nil).dispatchForNewWorkflow), ctx, targetWorkflow) } diff --git a/service/history/ndc/transaction_manager_new_workflow_test.go b/service/history/ndc/transaction_manager_new_workflow_test.go index 566f7530f5a..9a968f1beea 100644 --- a/service/history/ndc/transaction_manager_new_workflow_test.go +++ b/service/history/ndc/transaction_manager_new_workflow_test.go @@ -26,8 +26,8 @@ package ndc import ( "context" + "math/rand" "testing" - "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -74,7 +74,6 @@ func (s *transactionMgrForNewWorkflowSuite) TearDownTest() { func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_Dup() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -94,13 +93,12 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_Dup() { s.mockTransactionMgr.EXPECT().getCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(runID, nil) - err := s.createMgr.dispatchForNewWorkflow(ctx, now, newWorkflow) + err := s.createMgr.dispatchForNewWorkflow(ctx, newWorkflow) s.NoError(err) } -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_BrandNew_FirstEvents() { +func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_BrandNew() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -119,7 +117,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_BrandNew_ workflowSnapshot := &persistence.WorkflowSnapshot{} workflowEventsSeq := []*persistence.WorkflowEvents{{ Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID, + EventId: common.FirstEventID + rand.Int63(), }}, }} mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ @@ -147,68 +145,13 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_BrandNew_ workflowEventsSeq, ).Return(nil) - err := s.createMgr.dispatchForNewWorkflow(ctx, now, newWorkflow) + err := s.createMgr.dispatchForNewWorkflow(ctx, newWorkflow) s.NoError(err) s.True(releaseCalled) } -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_BrandNew_NonFirstEvents() { +func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsCurrent() { ctx := context.Background() - now := time.Now().UTC() - - namespaceID := namespace.ID("some random namespace ID") - workflowID := "some random workflow ID" - runID := "some random run ID" - - releaseCalled := false - - newWorkflow := NewMockWorkflow(s.controller) - weContext := workflow.NewMockContext(s.controller) - mutableState := workflow.NewMockMutableState(s.controller) - var releaseFn wcache.ReleaseCacheFunc = func(error) { releaseCalled = true } - newWorkflow.EXPECT().GetContext().Return(weContext).AnyTimes() - newWorkflow.EXPECT().GetMutableState().Return(mutableState).AnyTimes() - newWorkflow.EXPECT().GetReleaseFn().Return(releaseFn).AnyTimes() - - workflowSnapshot := &persistence.WorkflowSnapshot{} - workflowEventsSeq := []*persistence.WorkflowEvents{{ - Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID + 1, - }}, - }} - mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - NamespaceId: namespaceID.String(), - WorkflowId: workflowID, - }).AnyTimes() - mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: runID, - }).AnyTimes() - mutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return( - workflowSnapshot, workflowEventsSeq, nil, - ) - - s.mockTransactionMgr.EXPECT().getCurrentWorkflowRunID( - ctx, namespaceID, workflowID, - ).Return("", nil) - - weContext.EXPECT().CreateWorkflowExecution( - gomock.Any(), - persistence.CreateWorkflowModeBrandNew, - "", - int64(0), - mutableState, - workflowSnapshot, - workflowEventsSeq, - ).Return(nil) - - err := s.createMgr.dispatchForNewWorkflow(ctx, now, newWorkflow) - s.NoError(err) - s.True(releaseCalled) -} - -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsCurrent_FirstEvents() { - ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -236,7 +179,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsC targetWorkflowSnapshot := &persistence.WorkflowSnapshot{} targetWorkflowEventsSeq := []*persistence.WorkflowEvents{{ Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID, + EventId: common.FirstEventID + rand.Int63(), }}, }} targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ @@ -274,158 +217,14 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsC targetWorkflowEventsSeq, ).Return(nil) - err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) - s.NoError(err) - s.True(targetReleaseCalled) - s.True(currentReleaseCalled) -} - -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsCurrent_NonFirstEvents() { - ctx := context.Background() - now := time.Now().UTC() - - namespaceID := namespace.ID("some random namespace ID") - workflowID := "some random workflow ID" - targetRunID := "some random run ID" - currentRunID := "other random runID" - currentLastWriteVersion := int64(4321) - - targetReleaseCalled := false - currentReleaseCalled := false - - targetWorkflow := NewMockWorkflow(s.controller) - targetContext := workflow.NewMockContext(s.controller) - targetMutableState := workflow.NewMockMutableState(s.controller) - var targetReleaseFn wcache.ReleaseCacheFunc = func(error) { targetReleaseCalled = true } - targetWorkflow.EXPECT().GetContext().Return(targetContext).AnyTimes() - targetWorkflow.EXPECT().GetMutableState().Return(targetMutableState).AnyTimes() - targetWorkflow.EXPECT().GetReleaseFn().Return(targetReleaseFn).AnyTimes() - - currentWorkflow := NewMockWorkflow(s.controller) - currentMutableState := workflow.NewMockMutableState(s.controller) - var currentReleaseFn wcache.ReleaseCacheFunc = func(error) { currentReleaseCalled = true } - currentWorkflow.EXPECT().GetMutableState().Return(currentMutableState).AnyTimes() - currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes() - - targetWorkflowSnapshot := &persistence.WorkflowSnapshot{} - targetWorkflowEventsSeq := []*persistence.WorkflowEvents{{ - Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID + 1, - }}, - }} - targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - NamespaceId: namespaceID.String(), - WorkflowId: workflowID, - }).AnyTimes() - targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: targetRunID, - }).AnyTimes() - targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return( - targetWorkflowSnapshot, targetWorkflowEventsSeq, nil, - ) - - s.mockTransactionMgr.EXPECT().getCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil) - s.mockTransactionMgr.EXPECT().loadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil) - - targetWorkflow.EXPECT().HappensAfter(currentWorkflow).Return(true, nil) - currentMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() - currentMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - NamespaceId: namespaceID.String(), - WorkflowId: workflowID, - }).AnyTimes() - currentMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: currentRunID, - }).AnyTimes() - currentWorkflow.EXPECT().GetVectorClock().Return(currentLastWriteVersion, int64(0), nil) - - targetContext.EXPECT().CreateWorkflowExecution( - gomock.Any(), - persistence.CreateWorkflowModeUpdateCurrent, - currentRunID, - currentLastWriteVersion, - targetMutableState, - targetWorkflowSnapshot, - targetWorkflowEventsSeq, - ).Return(nil) - - err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) - s.NoError(err) - s.True(targetReleaseCalled) - s.True(currentReleaseCalled) -} - -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_FirstEvents() { - ctx := context.Background() - now := time.Now().UTC() - - namespaceID := namespace.ID("some random namespace ID") - workflowID := "some random workflow ID" - targetRunID := "some random run ID" - currentRunID := "other random runID" - - targetReleaseCalled := false - currentReleaseCalled := false - - targetWorkflow := NewMockWorkflow(s.controller) - targetContext := workflow.NewMockContext(s.controller) - targetMutableState := workflow.NewMockMutableState(s.controller) - var targetReleaseFn wcache.ReleaseCacheFunc = func(error) { targetReleaseCalled = true } - targetWorkflow.EXPECT().GetContext().Return(targetContext).AnyTimes() - targetWorkflow.EXPECT().GetMutableState().Return(targetMutableState).AnyTimes() - targetWorkflow.EXPECT().GetReleaseFn().Return(targetReleaseFn).AnyTimes() - - currentWorkflow := NewMockWorkflow(s.controller) - var currentReleaseFn wcache.ReleaseCacheFunc = func(error) { currentReleaseCalled = true } - currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes() - - targetWorkflowSnapshot := &persistence.WorkflowSnapshot{ - ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ - NamespaceId: namespaceID.String(), - WorkflowId: workflowID, - }, - } - targetWorkflowEventsSeq := []*persistence.WorkflowEvents{{ - Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID, - }}, - }} - targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - NamespaceId: namespaceID.String(), - WorkflowId: workflowID, - }).AnyTimes() - targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: targetRunID, - }).AnyTimes() - targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return( - targetWorkflowSnapshot, targetWorkflowEventsSeq, nil, - ) - - s.mockTransactionMgr.EXPECT().getCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil) - s.mockTransactionMgr.EXPECT().loadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil) - - targetWorkflow.EXPECT().HappensAfter(currentWorkflow).Return(false, nil) - targetWorkflow.EXPECT().SuppressBy(currentWorkflow).Return(workflow.TransactionPolicyPassive, nil) - - targetContext.EXPECT().CreateWorkflowExecution( - gomock.Any(), - persistence.CreateWorkflowModeBypassCurrent, - "", - int64(0), - targetMutableState, - targetWorkflowSnapshot, - targetWorkflowEventsSeq, - ).Return(nil) - targetContext.EXPECT().ReapplyEvents(gomock.Any(), targetWorkflowEventsSeq).Return(nil) - - err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) + err := s.createMgr.dispatchForNewWorkflow(ctx, targetWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(currentReleaseCalled) } -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_NonFirstEvents() { +func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -455,7 +254,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ } targetWorkflowEventsSeq := []*persistence.WorkflowEvents{{ Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID + 1, + EventId: common.FirstEventID + rand.Int63(), }}, }} targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ @@ -486,84 +285,14 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ ).Return(nil) targetContext.EXPECT().ReapplyEvents(gomock.Any(), targetWorkflowEventsSeq).Return(nil) - err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) - s.NoError(err) - s.True(targetReleaseCalled) - s.True(currentReleaseCalled) -} - -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_Dedup_FirstEvents() { - ctx := context.Background() - now := time.Now().UTC() - - namespaceID := namespace.ID("some random namespace ID") - workflowID := "some random workflow ID" - targetRunID := "some random run ID" - currentRunID := "other random runID" - - targetReleaseCalled := false - currentReleaseCalled := false - - targetWorkflow := NewMockWorkflow(s.controller) - targetContext := workflow.NewMockContext(s.controller) - targetMutableState := workflow.NewMockMutableState(s.controller) - var targetReleaseFn wcache.ReleaseCacheFunc = func(error) { targetReleaseCalled = true } - targetWorkflow.EXPECT().GetContext().Return(targetContext).AnyTimes() - targetWorkflow.EXPECT().GetMutableState().Return(targetMutableState).AnyTimes() - targetWorkflow.EXPECT().GetReleaseFn().Return(targetReleaseFn).AnyTimes() - - currentWorkflow := NewMockWorkflow(s.controller) - var currentReleaseFn wcache.ReleaseCacheFunc = func(error) { currentReleaseCalled = true } - currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes() - - targetWorkflowSnapshot := &persistence.WorkflowSnapshot{ - ExecutionInfo: &persistencespb.WorkflowExecutionInfo{ - NamespaceId: namespaceID.String(), - WorkflowId: workflowID, - }, - } - targetWorkflowEventsSeq := []*persistence.WorkflowEvents{{ - Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID, - }}, - }} - targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ - NamespaceId: namespaceID.String(), - WorkflowId: workflowID, - }).AnyTimes() - targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{ - RunId: targetRunID, - }).AnyTimes() - targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return( - targetWorkflowSnapshot, targetWorkflowEventsSeq, nil, - ) - - s.mockTransactionMgr.EXPECT().getCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil) - s.mockTransactionMgr.EXPECT().loadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil) - - targetWorkflow.EXPECT().HappensAfter(currentWorkflow).Return(false, nil) - targetWorkflow.EXPECT().SuppressBy(currentWorkflow).Return(workflow.TransactionPolicyPassive, nil) - - targetContext.EXPECT().CreateWorkflowExecution( - gomock.Any(), - persistence.CreateWorkflowModeBypassCurrent, - "", - int64(0), - targetMutableState, - targetWorkflowSnapshot, - targetWorkflowEventsSeq, - ).Return(&persistence.WorkflowConditionFailedError{}) - targetContext.EXPECT().ReapplyEvents(gomock.Any(), targetWorkflowEventsSeq).Return(nil) - - err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) + err := s.createMgr.dispatchForNewWorkflow(ctx, targetWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(currentReleaseCalled) } -func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_Dedup_NonFirstEvents() { +func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_Dedup() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -593,7 +322,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ } targetWorkflowEventsSeq := []*persistence.WorkflowEvents{{ Events: []*historypb.HistoryEvent{{ - EventId: common.FirstEventID + 1, + EventId: common.FirstEventID + rand.Int63(), }}, }} targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ @@ -624,7 +353,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ ).Return(&persistence.WorkflowConditionFailedError{}) targetContext.EXPECT().ReapplyEvents(gomock.Any(), targetWorkflowEventsSeq).Return(nil) - err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) + err := s.createMgr.dispatchForNewWorkflow(ctx, targetWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(currentReleaseCalled) @@ -632,7 +361,6 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_SuppressCurrentAndCreateAsCurrent() { ctx := context.Background() - now := time.Now().UTC() namespaceID := namespace.ID("some random namespace ID") workflowID := "some random workflow ID" @@ -684,7 +412,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_SuppressC workflow.TransactionPolicyPassive.Ptr(), ).Return(nil) - err := s.createMgr.dispatchForNewWorkflow(ctx, now, targetWorkflow) + err := s.createMgr.dispatchForNewWorkflow(ctx, targetWorkflow) s.NoError(err) s.True(targetReleaseCalled) s.True(currentReleaseCalled) diff --git a/service/history/ndc/transaction_manager_test.go b/service/history/ndc/transaction_manager_test.go index 556fc03aaa3..a795d4e89dc 100644 --- a/service/history/ndc/transaction_manager_test.go +++ b/service/history/ndc/transaction_manager_test.go @@ -28,7 +28,6 @@ import ( "context" "math/rand" "testing" - "time" "github.com/golang/mock/gomock" "github.com/pborman/uuid" @@ -115,29 +114,27 @@ func (s *transactionMgrSuite) TearDownTest() { func (s *transactionMgrSuite) TestCreateWorkflow() { ctx := context.Background() - now := time.Now().UTC() targetWorkflow := NewMockWorkflow(s.controller) s.mockCreateMgr.EXPECT().dispatchForNewWorkflow( - ctx, now, targetWorkflow, + ctx, targetWorkflow, ).Return(nil) - err := s.transactionMgr.createWorkflow(ctx, now, targetWorkflow) + err := s.transactionMgr.createWorkflow(ctx, targetWorkflow) s.NoError(err) } func (s *transactionMgrSuite) TestUpdateWorkflow() { ctx := context.Background() - now := time.Now().UTC() isWorkflowRebuilt := true targetWorkflow := NewMockWorkflow(s.controller) newWorkflow := NewMockWorkflow(s.controller) s.mockUpdateMgr.EXPECT().dispatchForExistingWorkflow( - ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow, + ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow, ).Return(nil) - err := s.transactionMgr.updateWorkflow(ctx, now, isWorkflowRebuilt, targetWorkflow, newWorkflow) + err := s.transactionMgr.updateWorkflow(ctx, isWorkflowRebuilt, targetWorkflow, newWorkflow) s.NoError(err) } diff --git a/service/history/ndc/workflow.go b/service/history/ndc/workflow.go index 7ba4eb81328..826b2221559 100644 --- a/service/history/ndc/workflow.go +++ b/service/history/ndc/workflow.go @@ -54,6 +54,8 @@ type ( GetMutableState() workflow.MutableState GetReleaseFn() wcache.ReleaseCacheFunc GetVectorClock() (int64, int64, error) + LastWriteByLocalCluster() (bool, error) + HappensAfter(that Workflow) (bool, error) Revive() error SuppressBy(incomingWorkflow Workflow) (workflow.TransactionPolicy, error) @@ -114,6 +116,16 @@ func (r *WorkflowImpl) GetVectorClock() (int64, int64, error) { return lastWriteVersion, lastEventTaskID, nil } +func (r *WorkflowImpl) LastWriteByLocalCluster() (bool, error) { + lastWriteVersion, err := r.mutableState.GetLastWriteVersion() + if err != nil { + return false, err + } + lastWriteCluster := r.clusterMetadata.ClusterNameForFailoverVersion(true, lastWriteVersion) + currentCluster := r.clusterMetadata.GetCurrentClusterName() + return lastWriteCluster == currentCluster, nil +} + func (r *WorkflowImpl) HappensAfter( that Workflow, ) (bool, error) { @@ -196,7 +208,10 @@ func (r *WorkflowImpl) SuppressBy( if currentCluster == lastWriteCluster { return workflow.TransactionPolicyActive, r.terminateWorkflow(lastWriteVersion, incomingLastWriteVersion) } - return workflow.TransactionPolicyPassive, r.zombiefyWorkflow() + return workflow.TransactionPolicyPassive, r.mutableState.UpdateWorkflowStateStatus( + enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE, + enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + ) } func (r *WorkflowImpl) FlushBufferedEvents() error { @@ -296,14 +311,6 @@ func (r *WorkflowImpl) terminateWorkflow( return err } -func (r *WorkflowImpl) zombiefyWorkflow() error { - - return r.mutableState.UpdateWorkflowStateStatus( - enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE, - enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, - ) -} - func WorkflowHappensAfter( thisLastWriteVersion int64, thisLastEventTaskID int64, diff --git a/service/history/ndc/workflow_mock.go b/service/history/ndc/workflow_mock.go index 84676b51b2a..5394b0e477f 100644 --- a/service/history/ndc/workflow_mock.go +++ b/service/history/ndc/workflow_mock.go @@ -146,6 +146,21 @@ func (mr *MockWorkflowMockRecorder) HappensAfter(that interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HappensAfter", reflect.TypeOf((*MockWorkflow)(nil).HappensAfter), that) } +// LastWriteByLocalCluster mocks base method. +func (m *MockWorkflow) LastWriteByLocalCluster() (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LastWriteByLocalCluster") + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LastWriteByLocalCluster indicates an expected call of LastWriteByLocalCluster. +func (mr *MockWorkflowMockRecorder) LastWriteByLocalCluster() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastWriteByLocalCluster", reflect.TypeOf((*MockWorkflow)(nil).LastWriteByLocalCluster)) +} + // Revive mocks base method. func (m *MockWorkflow) Revive() error { m.ctrl.T.Helper() diff --git a/service/history/workflow/mutable_state_state_status.go b/service/history/workflow/mutable_state_state_status.go index 80c1c79d4cf..bfc10f959ef 100644 --- a/service/history/workflow/mutable_state_state_status.go +++ b/service/history/workflow/mutable_state_state_status.go @@ -135,7 +135,7 @@ func setStateStatus( } case enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE: - if status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { + if status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING { return invalidStateTransitionErr(e.GetState(), state, status) }