diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 8220ad218ef..f00c0de0f94 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -64,36 +64,47 @@ type ( transactionManager transactionManager logger log.Logger - newBranchManager branchManagerProvider - newConflictResolver conflictResolverProvider - newWorkflowResetter workflowResetterProvider - newStateBuilder stateBuilderProvider - newMutableState mutableStateProvider - newReplicationTaskFn newReplicationTaskFn + newBranchManagerFn newBranchManagerFn + newConflictResolverFn newConflictResolverFn + newWorkflowResetterFn newWorkflowResetterFn + newStateBuilderFn newStateBuilderFn + newMutableStateFn newMutableStateFn + + // refactored functions for a better testability + newReplicationTaskFn newReplicationTaskFn + applyStartEventsFn applyStartEventsFn + applyNonStartEventsPrepareBranchFn applyNonStartEventsPrepareBranchFn + applyNonStartEventsPrepareMutableStateFn applyNonStartEventsPrepareMutableStateFn + applyNonStartEventsToCurrentBranchFn applyNonStartEventsToCurrentBranchFn + applyNonStartEventsToNoneCurrentBranchFn applyNonStartEventsToNoneCurrentBranchFn + applyNonStartEventsToNoneCurrentBranchWithContinueAsNewFn applyNonStartEventsToNoneCurrentBranchWithContinueAsNewFn + applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNewFn applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNewFn + applyNonStartEventsMissingMutableStateFn applyNonStartEventsMissingMutableStateFn + applyNonStartEventsResetWorkflowFn applyNonStartEventsResetWorkflowFn } - stateBuilderProvider func( + newStateBuilderFn func( mutableState execution.MutableState, logger log.Logger) execution.StateBuilder - mutableStateProvider func( + newMutableStateFn func( domainEntry *cache.DomainCacheEntry, logger log.Logger, ) execution.MutableState - branchManagerProvider func( + newBranchManagerFn func( context execution.Context, mutableState execution.MutableState, logger log.Logger, ) branchManager - conflictResolverProvider func( + newConflictResolverFn func( context execution.Context, mutableState execution.MutableState, logger log.Logger, ) conflictResolver - workflowResetterProvider func( + newWorkflowResetterFn func( domainID string, workflowID string, baseRunID string, @@ -109,6 +120,99 @@ type ( logger log.Logger, request *types.ReplicateEventsV2Request, ) (replicationTask, error) + + applyStartEventsFn func( + ctx ctx.Context, + context execution.Context, + releaseFn execution.ReleaseFunc, + task replicationTask, + domainCache cache.DomainCache, + newMutableState newMutableStateFn, + newStateBuilder newStateBuilderFn, + transactionManager transactionManager, + logger log.Logger, + shard shard.Context, + clusterMetadata cluster.Metadata, + ) (retError error) + + applyNonStartEventsPrepareBranchFn func( + ctx ctx.Context, + context execution.Context, + mutableState execution.MutableState, + task replicationTask, + newBranchManager newBranchManagerFn, + ) (bool, int, error) + + applyNonStartEventsPrepareMutableStateFn func( + ctx ctx.Context, + context execution.Context, + mutableState execution.MutableState, + branchIndex int, + task replicationTask, + newConflictResolver newConflictResolverFn, + ) (execution.MutableState, bool, error) + + applyNonStartEventsToCurrentBranchFn func( + ctx ctx.Context, + context execution.Context, + mutableState execution.MutableState, + isRebuilt bool, + releaseFn execution.ReleaseFunc, + task replicationTask, + newStateBuilder newStateBuilderFn, + clusterMetadata cluster.Metadata, + shard shard.Context, + logger log.Logger, + transactionManager transactionManager, + ) error + + applyNonStartEventsToNoneCurrentBranchFn func( + ctx ctx.Context, + context execution.Context, + mutableState execution.MutableState, + branchIndex int, + releaseFn execution.ReleaseFunc, + task replicationTask, + r *historyReplicatorImpl, + ) error + + applyNonStartEventsToNoneCurrentBranchWithContinueAsNewFn func( + ctx ctx.Context, + context execution.Context, + releaseFn execution.ReleaseFunc, + task replicationTask, + r *historyReplicatorImpl, + ) error + + applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNewFn func( + ctx ctx.Context, + context execution.Context, + mutableState execution.MutableState, + branchIndex int, + releaseFn execution.ReleaseFunc, + task replicationTask, + transactionManager transactionManager, + clusterMetadata cluster.Metadata, + ) error + + applyNonStartEventsMissingMutableStateFn func( + ctx ctx.Context, + newContext execution.Context, + task replicationTask, + newWorkflowResetter newWorkflowResetterFn, + ) (execution.MutableState, error) + + applyNonStartEventsResetWorkflowFn func( + ctx ctx.Context, + context execution.Context, + mutableState execution.MutableState, + task replicationTask, + newStateBuilder newStateBuilderFn, + transactionManager transactionManager, + clusterMetadata cluster.Metadata, + logger log.Logger, + shard shard.Context, + ) error ) var _ HistoryReplicator = (*historyReplicatorImpl)(nil) @@ -136,21 +240,21 @@ func NewHistoryReplicator( eventsReapplier: eventsReapplier, logger: logger.WithTags(tag.ComponentHistoryReplicator), - newBranchManager: func( + newBranchManagerFn: func( context execution.Context, mutableState execution.MutableState, logger log.Logger, ) branchManager { return newBranchManager(shard, context, mutableState, logger) }, - newConflictResolver: func( + newConflictResolverFn: func( context execution.Context, mutableState execution.MutableState, logger log.Logger, ) conflictResolver { return newConflictResolver(shard, context, mutableState, logger) }, - newWorkflowResetter: func( + newWorkflowResetterFn: func( domainID string, workflowID string, baseRunID string, @@ -160,7 +264,7 @@ func NewHistoryReplicator( ) WorkflowResetter { return NewWorkflowResetter(shard, transactionManager, domainID, workflowID, baseRunID, newContext, newRunID, logger) }, - newStateBuilder: func( + newStateBuilderFn: func( state execution.MutableState, logger log.Logger, ) execution.StateBuilder { @@ -171,7 +275,7 @@ func NewHistoryReplicator( state, ) }, - newMutableState: func( + newMutableStateFn: func( domainEntry *cache.DomainCacheEntry, logger log.Logger, ) execution.MutableState { @@ -181,7 +285,16 @@ func NewHistoryReplicator( domainEntry, ) }, - newReplicationTaskFn: newReplicationTask, + newReplicationTaskFn: newReplicationTask, + applyStartEventsFn: applyStartEvents, + applyNonStartEventsPrepareBranchFn: applyNonStartEventsPrepareBranch, + applyNonStartEventsPrepareMutableStateFn: applyNonStartEventsPrepareMutableState, + applyNonStartEventsToCurrentBranchFn: applyNonStartEventsToCurrentBranch, + applyNonStartEventsToNoneCurrentBranchFn: applyNonStartEventsToNoneCurrentBranch, + applyNonStartEventsToNoneCurrentBranchWithContinueAsNewFn: applyNonStartEventsToNoneCurrentBranchWithContinueAsNew, + applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNewFn: applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNew, + applyNonStartEventsMissingMutableStateFn: applyNonStartEventsMissingMutableState, + applyNonStartEventsResetWorkflowFn: applyNonStartEventsResetWorkflow, } return replicator @@ -233,7 +346,8 @@ func (r *historyReplicatorImpl) applyEvents( switch task.getFirstEvent().GetEventType() { case types.EventTypeWorkflowExecutionStarted: - return r.applyStartEvents(ctx, context, releaseFn, task) + return r.applyStartEventsFn(ctx, context, releaseFn, task, r.domainCache, + r.newMutableStateFn, r.newStateBuilderFn, r.transactionManager, r.logger, r.shard, r.clusterMetadata) default: // apply events, other than simple start workflow execution @@ -246,7 +360,7 @@ func (r *historyReplicatorImpl) applyEvents( return execution.ErrMissingVersionHistories } - doContinue, branchIndex, err := r.applyNonStartEventsPrepareBranch(ctx, context, mutableState, task) + doContinue, branchIndex, err := r.applyNonStartEventsPrepareBranchFn(ctx, context, mutableState, task, r.newBranchManagerFn) if err != nil { return err } else if !doContinue { @@ -254,24 +368,27 @@ func (r *historyReplicatorImpl) applyEvents( return nil } - mutableState, isRebuilt, err := r.applyNonStartEventsPrepareMutableState(ctx, context, mutableState, branchIndex, task) + mutableState, isRebuilt, err := r.applyNonStartEventsPrepareMutableStateFn(ctx, context, mutableState, branchIndex, task, r.newConflictResolverFn) if err != nil { return err } if mutableState.GetVersionHistories().GetCurrentVersionHistoryIndex() == branchIndex { - return r.applyNonStartEventsToCurrentBranch(ctx, context, mutableState, isRebuilt, releaseFn, task) + return r.applyNonStartEventsToCurrentBranchFn(ctx, context, mutableState, isRebuilt, releaseFn, task, + r.newStateBuilderFn, r.clusterMetadata, r.shard, r.logger, r.transactionManager) } - return r.applyNonStartEventsToNoneCurrentBranch(ctx, context, mutableState, branchIndex, releaseFn, task) + // passed in r because there's a recursive call within applyNonStartEventsToNoneCurrentBranchWithContinueAsNew + return r.applyNonStartEventsToNoneCurrentBranchFn(ctx, context, mutableState, branchIndex, releaseFn, task, r) case *types.EntityNotExistsError: // mutable state not created, check if is workflow reset - mutableState, err := r.applyNonStartEventsMissingMutableState(ctx, context, task) + mutableState, err := r.applyNonStartEventsMissingMutableStateFn(ctx, context, task, r.newWorkflowResetterFn) if err != nil { return err } - return r.applyNonStartEventsResetWorkflow(ctx, context, mutableState, task) + return r.applyNonStartEventsResetWorkflowFn(ctx, context, mutableState, task, + r.newStateBuilderFn, r.transactionManager, r.clusterMetadata, r.logger, r.shard) default: // unable to get mutable state, return err so we can retry the task later @@ -280,20 +397,27 @@ func (r *historyReplicatorImpl) applyEvents( } } -func (r *historyReplicatorImpl) applyStartEvents( +func applyStartEvents( ctx ctx.Context, context execution.Context, releaseFn execution.ReleaseFunc, task replicationTask, + domainCache cache.DomainCache, + newMutableState newMutableStateFn, + newStateBuilder newStateBuilderFn, + transactionManager transactionManager, + logger log.Logger, + shard shard.Context, + clusterMetadata cluster.Metadata, ) (retError error) { - domainEntry, err := r.domainCache.GetDomainByID(task.getDomainID()) + domainEntry, err := domainCache.GetDomainByID(task.getDomainID()) if err != nil { return err } requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. - mutableState := r.newMutableState(domainEntry, task.getLogger()) - stateBuilder := r.newStateBuilder(mutableState, task.getLogger()) + mutableState := newMutableState(domainEntry, task.getLogger()) + stateBuilder := newStateBuilder(mutableState, task.getLogger()) // use state builder for workflow mutable state mutation _, err = stateBuilder.ApplyEvents( @@ -311,12 +435,12 @@ func (r *historyReplicatorImpl) applyStartEvents( return err } - err = r.transactionManager.createWorkflow( + err = transactionManager.createWorkflow( ctx, task.getEventTime(), execution.NewWorkflow( ctx, - r.clusterMetadata, + clusterMetadata, context, mutableState, releaseFn, @@ -328,20 +452,21 @@ func (r *historyReplicatorImpl) applyStartEvents( tag.Error(err), ) } else { - r.notify(task.getSourceCluster(), task.getEventTime()) + notify(task.getSourceCluster(), task.getEventTime(), logger, shard, clusterMetadata) } return err } -func (r *historyReplicatorImpl) applyNonStartEventsPrepareBranch( +func applyNonStartEventsPrepareBranch( ctx ctx.Context, context execution.Context, mutableState execution.MutableState, task replicationTask, + newBranchManager newBranchManagerFn, ) (bool, int, error) { incomingVersionHistory := task.getVersionHistory() - branchManager := r.newBranchManager(context, mutableState, task.getLogger()) + branchManager := newBranchManager(context, mutableState, task.getLogger()) doContinue, versionHistoryIndex, err := branchManager.prepareVersionHistory( ctx, incomingVersionHistory, @@ -364,16 +489,17 @@ func (r *historyReplicatorImpl) applyNonStartEventsPrepareBranch( } } -func (r *historyReplicatorImpl) applyNonStartEventsPrepareMutableState( +func applyNonStartEventsPrepareMutableState( ctx ctx.Context, context execution.Context, mutableState execution.MutableState, branchIndex int, task replicationTask, + newConflictResolver newConflictResolverFn, ) (execution.MutableState, bool, error) { incomingVersion := task.getVersion() - conflictResolver := r.newConflictResolver(context, mutableState, task.getLogger()) + conflictResolver := newConflictResolver(context, mutableState, task.getLogger()) mutableState, isRebuilt, err := conflictResolver.prepareMutableState( ctx, branchIndex, @@ -388,17 +514,22 @@ func (r *historyReplicatorImpl) applyNonStartEventsPrepareMutableState( return mutableState, isRebuilt, err } -func (r *historyReplicatorImpl) applyNonStartEventsToCurrentBranch( +func applyNonStartEventsToCurrentBranch( ctx ctx.Context, context execution.Context, mutableState execution.MutableState, isRebuilt bool, releaseFn execution.ReleaseFunc, task replicationTask, + newStateBuilder newStateBuilderFn, + clusterMetadata cluster.Metadata, + shard shard.Context, + logger log.Logger, + transactionManager transactionManager, ) error { requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. - stateBuilder := r.newStateBuilder(mutableState, task.getLogger()) + stateBuilder := newStateBuilder(mutableState, task.getLogger()) newMutableState, err := stateBuilder.ApplyEvents( task.getDomainID(), requestID, @@ -416,7 +547,7 @@ func (r *historyReplicatorImpl) applyNonStartEventsToCurrentBranch( targetWorkflow := execution.NewWorkflow( ctx, - r.clusterMetadata, + clusterMetadata, context, mutableState, releaseFn, @@ -431,21 +562,21 @@ func (r *historyReplicatorImpl) applyNonStartEventsToCurrentBranch( WorkflowID: newExecutionInfo.WorkflowID, RunID: newExecutionInfo.RunID, }, - r.shard, - r.shard.GetExecutionManager(), - r.logger, + shard, + shard.GetExecutionManager(), + logger, ) newWorkflow = execution.NewWorkflow( ctx, - r.clusterMetadata, + clusterMetadata, newContext, newMutableState, execution.NoopReleaseFn, ) } - err = r.transactionManager.updateWorkflow( + err = transactionManager.updateWorkflow( ctx, task.getEventTime(), isRebuilt, @@ -458,46 +589,52 @@ func (r *historyReplicatorImpl) applyNonStartEventsToCurrentBranch( tag.Error(err), ) } else { - r.notify(task.getSourceCluster(), task.getEventTime()) + notify(task.getSourceCluster(), task.getEventTime(), logger, shard, clusterMetadata) } return err } -func (r *historyReplicatorImpl) applyNonStartEventsToNoneCurrentBranch( +func applyNonStartEventsToNoneCurrentBranch( ctx ctx.Context, context execution.Context, mutableState execution.MutableState, branchIndex int, releaseFn execution.ReleaseFunc, task replicationTask, + r *historyReplicatorImpl, ) error { if len(task.getNewEvents()) != 0 { - return r.applyNonStartEventsToNoneCurrentBranchWithContinueAsNew( + return r.applyNonStartEventsToNoneCurrentBranchWithContinueAsNewFn( ctx, context, releaseFn, task, + r, ) } - return r.applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNew( + return r.applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNewFn( ctx, context, mutableState, branchIndex, releaseFn, task, + r.transactionManager, + r.clusterMetadata, ) } -func (r *historyReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNew( +func applyNonStartEventsToNoneCurrentBranchWithoutContinueAsNew( ctx ctx.Context, context execution.Context, mutableState execution.MutableState, branchIndex int, releaseFn execution.ReleaseFunc, task replicationTask, + transactionManager transactionManager, + clusterMetadata cluster.Metadata, ) error { versionHistoryItem := persistence.NewVersionHistoryItem( @@ -516,12 +653,12 @@ func (r *historyReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithoutCon return err } - err = r.transactionManager.backfillWorkflow( + err = transactionManager.backfillWorkflow( ctx, task.getEventTime(), execution.NewWorkflow( ctx, - r.clusterMetadata, + clusterMetadata, context, mutableState, releaseFn, @@ -544,11 +681,12 @@ func (r *historyReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithoutCon return nil } -func (r *historyReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithContinueAsNew( +func applyNonStartEventsToNoneCurrentBranchWithContinueAsNew( ctx ctx.Context, context execution.Context, releaseFn execution.ReleaseFunc, task replicationTask, + r *historyReplicatorImpl, ) error { // workflow backfill to non current branch with continue as new @@ -589,10 +727,11 @@ func (r *historyReplicatorImpl) applyNonStartEventsToNoneCurrentBranchWithContin return nil } -func (r *historyReplicatorImpl) applyNonStartEventsMissingMutableState( +func applyNonStartEventsMissingMutableState( ctx ctx.Context, newContext execution.Context, task replicationTask, + newWorkflowResetter newWorkflowResetterFn, ) (execution.MutableState, error) { // for non reset workflow execution replication task, just do re-replication @@ -614,7 +753,7 @@ func (r *historyReplicatorImpl) applyNonStartEventsMissingMutableState( baseEventID := decisionTaskEvent.ID - 1 baseRunID, newRunID, baseEventVersion, _ := task.getWorkflowResetMetadata() - workflowResetter := r.newWorkflowResetter( + workflowResetter := newWorkflowResetter( task.getDomainID(), task.getWorkflowID(), baseRunID, @@ -641,15 +780,20 @@ func (r *historyReplicatorImpl) applyNonStartEventsMissingMutableState( return resetMutableState, nil } -func (r *historyReplicatorImpl) applyNonStartEventsResetWorkflow( +func applyNonStartEventsResetWorkflow( ctx ctx.Context, context execution.Context, mutableState execution.MutableState, task replicationTask, + newStateBuilder newStateBuilderFn, + transactionManager transactionManager, + clusterMetadata cluster.Metadata, + logger log.Logger, + shard shard.Context, ) error { requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. - stateBuilder := r.newStateBuilder(mutableState, task.getLogger()) + stateBuilder := newStateBuilder(mutableState, task.getLogger()) _, err := stateBuilder.ApplyEvents( task.getDomainID(), requestID, @@ -667,13 +811,13 @@ func (r *historyReplicatorImpl) applyNonStartEventsResetWorkflow( targetWorkflow := execution.NewWorkflow( ctx, - r.clusterMetadata, + clusterMetadata, context, mutableState, execution.NoopReleaseFn, ) - err = r.transactionManager.createWorkflow( + err = transactionManager.createWorkflow( ctx, task.getEventTime(), targetWorkflow, @@ -684,22 +828,25 @@ func (r *historyReplicatorImpl) applyNonStartEventsResetWorkflow( tag.Error(err), ) } else { - r.notify(task.getSourceCluster(), task.getEventTime()) + notify(task.getSourceCluster(), task.getEventTime(), logger, shard, clusterMetadata) } return err } -func (r *historyReplicatorImpl) notify( +func notify( clusterName string, now time.Time, + logger log.Logger, + shard shard.Context, + clusterMetadata cluster.Metadata, ) { - if clusterName == r.clusterMetadata.GetCurrentClusterName() { + if clusterName == clusterMetadata.GetCurrentClusterName() { // this is a valid use case for testing, but not for production - r.logger.Warn("nDCHistoryReplicator applying events generated by current cluster") + logger.Warn("nDCHistoryReplicator applying events generated by current cluster") return } - now = now.Add(-r.shard.GetConfig().StandbyClusterDelay()) - r.shard.SetCurrentTime(clusterName, now) + now = now.Add(-shard.GetConfig().StandbyClusterDelay()) + shard.SetCurrentTime(clusterName, now) } func newNDCRetryTaskErrorWithHint( diff --git a/service/history/ndc/history_replicator_test.go b/service/history/ndc/history_replicator_test.go index bd55fa6a215..c04e53f1416 100644 --- a/service/history/ndc/history_replicator_test.go +++ b/service/history/ndc/history_replicator_test.go @@ -140,10 +140,10 @@ func TestNewHistoryReplicator_newBranchManager(t *testing.T) { testReplicator := NewHistoryReplicator(mockShard, testExecutionCache, mockEventsReapplier, log.NewNoop()) testReplicatorImpl := testReplicator.(*historyReplicatorImpl) - // test newBranchManager function in history replicator + // test newBranchManagerFn function in history replicator mockExecutionContext := execution.NewMockContext(ctrl) mockExecutionMutableState := execution.NewMockMutableState(ctrl) - assert.NotNil(t, testReplicatorImpl.newBranchManager(mockExecutionContext, mockExecutionMutableState, log.NewNoop())) + assert.NotNil(t, testReplicatorImpl.newBranchManagerFn(mockExecutionContext, mockExecutionMutableState, log.NewNoop())) } func TestNewHistoryReplicator_newConflictResolver(t *testing.T) { @@ -189,14 +189,14 @@ func TestNewHistoryReplicator_newConflictResolver(t *testing.T) { testReplicator := NewHistoryReplicator(mockShard, testExecutionCache, mockEventsReapplier, log.NewNoop()) testReplicatorImpl := testReplicator.(*historyReplicatorImpl) - // test newConflictResolver function in history replicator + // test newConflictResolverFn function in history replicator mockEventsCache := events.NewMockCache(ctrl) mockShard.EXPECT().GetEventsCache().Return(mockEventsCache).Times(1) mockShard.EXPECT().GetShardID().Return(0).Times(1) mockExecutionContext := execution.NewMockContext(ctrl) mockExecutionMutableState := execution.NewMockMutableState(ctrl) - assert.NotNil(t, testReplicatorImpl.newConflictResolver(mockExecutionContext, mockExecutionMutableState, log.NewNoop())) + assert.NotNil(t, testReplicatorImpl.newConflictResolverFn(mockExecutionContext, mockExecutionMutableState, log.NewNoop())) } func TestNewHistoryReplicator_newWorkflowResetter(t *testing.T) { @@ -242,13 +242,13 @@ func TestNewHistoryReplicator_newWorkflowResetter(t *testing.T) { testReplicator := NewHistoryReplicator(mockShard, testExecutionCache, mockEventsReapplier, log.NewNoop()) testReplicatorImpl := testReplicator.(*historyReplicatorImpl) - // test newWorkflowResetter function in history replicator + // test newWorkflowResetterFn function in history replicator mockEventsCache := events.NewMockCache(ctrl) mockShard.EXPECT().GetEventsCache().Return(mockEventsCache).Times(1) mockShard.EXPECT().GetShardID().Return(0).Times(1) mockExecutionContext := execution.NewMockContext(ctrl) - assert.NotNil(t, testReplicatorImpl.newWorkflowResetter( + assert.NotNil(t, testReplicatorImpl.newWorkflowResetterFn( "test-domain-id", "test-workflow-id", "test-base-run-id", @@ -301,9 +301,9 @@ func TestNewHistoryReplicator_newStateBuilder(t *testing.T) { testReplicator := NewHistoryReplicator(mockShard, testExecutionCache, mockEventsReapplier, log.NewNoop()) testReplicatorImpl := testReplicator.(*historyReplicatorImpl) - // test newStateBuilder function in history replicator + // test newStateBuilderFn function in history replicator mockExecutionMutableState := execution.NewMockMutableState(ctrl) - assert.NotNil(t, testReplicatorImpl.newStateBuilder(mockExecutionMutableState, log.NewNoop())) + assert.NotNil(t, testReplicatorImpl.newStateBuilderFn(mockExecutionMutableState, log.NewNoop())) } func TestNewHistoryReplicator_newMutableState(t *testing.T) { @@ -349,7 +349,7 @@ func TestNewHistoryReplicator_newMutableState(t *testing.T) { testReplicator := NewHistoryReplicator(mockShard, testExecutionCache, mockEventsReapplier, log.NewNoop()) testReplicatorImpl := testReplicator.(*historyReplicatorImpl) - // test newMutableState function in history replicator + // test newMutableStateFn function in history replicator deadline := int64(0) mockShard.EXPECT().GetTimeSource().Return(clock.NewMockedTimeSource()).Times(1) mockEventsCache := events.NewMockCache(ctrl) @@ -362,7 +362,7 @@ func TestNewHistoryReplicator_newMutableState(t *testing.T) { 0, &deadline, ) - assert.NotNil(t, testReplicatorImpl.newMutableState(mockDomainCacheEntry, log.NewNoop())) + assert.NotNil(t, testReplicatorImpl.newMutableStateFn(mockDomainCacheEntry, log.NewNoop())) } func TestApplyEvents(t *testing.T) {