Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify XDC test logic #4522

Merged
merged 2 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions common/persistence/tests/cassandra_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sort"
"strings"
"testing"
"time"

"github.com/blang/semver/v4"
"github.com/gocql/gocql"
Expand Down Expand Up @@ -241,10 +242,11 @@ func GetSchemaFiles(schemaDir string, logger log.Logger) []string {
// NewCassandraConfig returns a new Cassandra config for test
func NewCassandraConfig() *config.Cassandra {
return &config.Cassandra{
User: testCassandraUser,
Password: testCassandraPassword,
Hosts: environment.GetCassandraAddress(),
Port: environment.GetCassandraPort(),
Keyspace: testCassandraDatabaseNamePrefix + shuffle.String(testCassandraDatabaseNameSuffix),
User: testCassandraUser,
Password: testCassandraPassword,
Hosts: environment.GetCassandraAddress(),
Port: environment.GetCassandraPort(),
Keyspace: testCassandraDatabaseNamePrefix + shuffle.String(testCassandraDatabaseNameSuffix),
ConnectTimeout: 30 * time.Second,
}
}
6 changes: 1 addition & 5 deletions service/history/ndc/history_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState(
return err
}

lastEventTime, lastFirstTxnID, err := r.backfillHistory(
_, lastFirstTxnID, err := r.backfillHistory(
ctx,
request.GetRemoteCluster(),
namespaceID,
Expand Down Expand Up @@ -377,7 +377,6 @@ func (r *HistoryReplicatorImpl) ApplyWorkflowState(
}
return r.transactionMgr.createWorkflow(
ctx,
timestamp.TimeValue(lastEventTime),
NewWorkflow(
ctx,
r.namespaceRegistry,
Expand Down Expand Up @@ -499,7 +498,6 @@ func (r *HistoryReplicatorImpl) applyStartEvents(

err = r.transactionMgr.createWorkflow(
ctx,
task.getEventTime(),
NewWorkflow(
ctx,
r.namespaceRegistry,
Expand Down Expand Up @@ -637,7 +635,6 @@ func (r *HistoryReplicatorImpl) applyNonStartEventsToCurrentBranch(

err = r.transactionMgr.updateWorkflow(
ctx,
task.getEventTime(),
isRebuilt,
targetWorkflow,
newWorkflow,
Expand Down Expand Up @@ -883,7 +880,6 @@ func (r *HistoryReplicatorImpl) applyNonStartEventsResetWorkflow(

err = r.transactionMgr.createWorkflow(
ctx,
task.getEventTime(),
targetWorkflow,
)
if err != nil {
Expand Down
7 changes: 0 additions & 7 deletions service/history/ndc/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package ndc

import (
"context"
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -117,12 +116,10 @@ type (
transactionMgr interface {
createWorkflow(
ctx context.Context,
now time.Time,
targetWorkflow Workflow,
) error
updateWorkflow(
ctx context.Context,
now time.Time,
isWorkflowRebuilt bool,
targetWorkflow Workflow,
newWorkflow Workflow,
Expand Down Expand Up @@ -204,28 +201,24 @@ func newTransactionMgr(

func (r *transactionMgrImpl) createWorkflow(
ctx context.Context,
now time.Time,
targetWorkflow Workflow,
) error {

return r.createMgr.dispatchForNewWorkflow(
ctx,
now,
targetWorkflow,
)
}

func (r *transactionMgrImpl) updateWorkflow(
ctx context.Context,
now time.Time,
isWorkflowRebuilt bool,
targetWorkflow Workflow,
newWorkflow Workflow,
) error {

return r.updateMgr.dispatchForExistingWorkflow(
ctx,
now,
isWorkflowRebuilt,
targetWorkflow,
newWorkflow,
Expand Down
24 changes: 0 additions & 24 deletions service/history/ndc/transaction_manager_existing_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ package ndc
import (
"context"
"fmt"
"time"

"go.temporal.io/api/serviceerror"

Expand All @@ -42,7 +41,6 @@ type (
transactionMgrForExistingWorkflow interface {
dispatchForExistingWorkflow(
ctx context.Context,
now time.Time,
isWorkflowRebuilt bool,
targetWorkflow Workflow,
newWorkflow Workflow,
Expand All @@ -67,7 +65,6 @@ func newNDCTransactionMgrForExistingWorkflow(

func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(
ctx context.Context,
now time.Time,
isWorkflowRebuilt bool,
targetWorkflow Workflow,
newWorkflow Workflow,
Expand All @@ -84,7 +81,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(
// update to current record, since target workflow is pointed by current record
return r.dispatchWorkflowUpdateAsCurrent(
ctx,
now,
isWorkflowRebuilt,
targetWorkflow,
newWorkflow,
Expand Down Expand Up @@ -120,7 +116,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(
// update to current record, since target workflow is pointed by current record
return r.dispatchWorkflowUpdateAsCurrent(
ctx,
now,
isWorkflowRebuilt,
targetWorkflow,
newWorkflow,
Expand All @@ -147,7 +142,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(
// target workflow is older than current workflow, need to suppress the target workflow
return r.dispatchWorkflowUpdateAsZombie(
ctx,
now,
isWorkflowRebuilt,
currentWorkflow,
targetWorkflow,
Expand All @@ -159,7 +153,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(
// will set target workflow using snapshot
return r.executeTransaction(
ctx,
now,
nDCTransactionPolicySuppressCurrentAndUpdateAsCurrent,
currentWorkflow,
targetWorkflow,
Expand All @@ -169,7 +162,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchForExistingWorkflow(

func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurrent(
ctx context.Context,
now time.Time,
isWorkflowRebuilt bool,
targetWorkflow Workflow,
newWorkflow Workflow,
Expand All @@ -178,7 +170,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurre
if !isWorkflowRebuilt {
return r.executeTransaction(
ctx,
now,
nDCTransactionPolicyUpdateAsCurrent,
nil,
targetWorkflow,
Expand All @@ -188,7 +179,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurre

return r.executeTransaction(
ctx,
now,
nDCTransactionPolicyConflictResolveAsCurrent,
nil,
targetWorkflow,
Expand All @@ -198,7 +188,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsCurre

func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombie(
ctx context.Context,
now time.Time,
isWorkflowRebuilt bool,
currentWorkflow Workflow,
targetWorkflow Workflow,
Expand All @@ -208,7 +197,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombi
if !isWorkflowRebuilt {
return r.executeTransaction(
ctx,
now,
nDCTransactionPolicyUpdateAsZombie,
currentWorkflow,
targetWorkflow,
Expand All @@ -218,7 +206,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombi

return r.executeTransaction(
ctx,
now,
nDCTransactionPolicyConflictResolveAsZombie,
currentWorkflow,
targetWorkflow,
Expand All @@ -228,7 +215,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) dispatchWorkflowUpdateAsZombi

func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsCurrent(
ctx context.Context,
now time.Time,
targetWorkflow Workflow,
newWorkflow Workflow,
) error {
Expand All @@ -246,7 +232,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsCurrent(

func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsZombie(
ctx context.Context,
now time.Time,
currentWorkflow Workflow,
targetWorkflow Workflow,
newWorkflow Workflow,
Expand Down Expand Up @@ -319,7 +304,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) updateAsZombie(

func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCurrent(
ctx context.Context,
now time.Time,
currentWorkflow Workflow,
targetWorkflow Workflow,
newWorkflow Workflow,
Expand Down Expand Up @@ -364,7 +348,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) suppressCurrentAndUpdateAsCur

func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsCurrent(
ctx context.Context,
now time.Time,
targetWorkflow Workflow,
newWorkflow Workflow,
) error {
Expand All @@ -390,7 +373,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsCurrent(

func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsZombie(
ctx context.Context,
now time.Time,
currentWorkflow Workflow,
targetWorkflow Workflow,
newWorkflow Workflow,
Expand Down Expand Up @@ -462,7 +444,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) conflictResolveAsZombie(

func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction(
ctx context.Context,
now time.Time,
transactionPolicy nDCTransactionPolicy,
currentWorkflow Workflow,
targetWorkflow Workflow,
Expand All @@ -482,15 +463,13 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction(
case nDCTransactionPolicyUpdateAsCurrent:
return r.updateAsCurrent(
ctx,
now,
targetWorkflow,
newWorkflow,
)

case nDCTransactionPolicyUpdateAsZombie:
return r.updateAsZombie(
ctx,
now,
currentWorkflow,
targetWorkflow,
newWorkflow,
Expand All @@ -499,7 +478,6 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction(
case nDCTransactionPolicySuppressCurrentAndUpdateAsCurrent:
return r.suppressCurrentAndUpdateAsCurrent(
ctx,
now,
currentWorkflow,
targetWorkflow,
newWorkflow,
Expand All @@ -508,15 +486,13 @@ func (r *nDCTransactionMgrForExistingWorkflowImpl) executeTransaction(
case nDCTransactionPolicyConflictResolveAsCurrent:
return r.conflictResolveAsCurrent(
ctx,
now,
targetWorkflow,
newWorkflow,
)

case nDCTransactionPolicyConflictResolveAsZombie:
return r.conflictResolveAsZombie(
ctx,
now,
currentWorkflow,
targetWorkflow,
newWorkflow,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading