Skip to content

Commit

Permalink
Merge branch 'master' into domain-mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed May 25, 2018
2 parents bc87400 + b9fd9fa commit ba13d2b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 31 deletions.
4 changes: 3 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ make bins

## Testing

Before running the tests you must have `cassandra` running locally:
Before running the tests you must have `cassandra` and `kafka` running locally:

```bash
# for OS X
Expand All @@ -62,6 +62,8 @@ brew install cassandra
/usr/local/bin/cassandra
```

To run kafka, follow kafka quickstart guide [here](https://kafka.apache.org/quickstart)

Run all the tests:

```bash
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Business logic is modeled as workflows and activities. Workflows are the impleme

The Cadence server brokers and persists tasks and events generated during workflow execution, which provides certain scalability and realiability guarantees for workflow executions. An individual activity execution is not fault tolerant as it can fail for various reasons. But the workflow that defines in which order and how (location, input parameters, timeouts, etc.) activities are executed is guaranteed to continue execution under various failure conditions.

This repo contains the source code of the Cadence server. The client lib you can use to implement workflows, activities and worker can be found [here](https://github.com/uber-go/cadence-client).
This repo contains the source code of the Cadence server. To implement workflows, activities and worker use [Go client](https://github.com/uber-go/cadence-client) or [Java client](https://github.com/uber-java/cadence-client).

See Maxim's talk at [Data@Scale Conference](https://atscaleconference.com/videos/cadence-microservice-architecture-beyond-requestreply) for an architectural overview of Cadence.

Expand Down Expand Up @@ -53,4 +53,4 @@ We'd love your help in making Cadence great. Please review our [instructions](CO
## License

MIT License, please see [LICENSE](https://github.com/uber/cadence/blob/master/LICENSE) for details.


69 changes: 41 additions & 28 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,33 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE

execution := *request.WorkflowExecution

var context *workflowExecutionContext
context, release, err := r.historyCache.getOrCreateWorkflowExecution(domainID, execution)
if err != nil {
return err
}
defer func() { release(retError) }()

var msBuilder *mutableStateBuilder
firstEvent := request.History.Events[0]
switch firstEvent.GetEventType() {
case shared.EventTypeWorkflowExecutionStarted:
msBuilder = newMutableStateBuilderWithReplicationState(r.shard.GetConfig(), r.logger, request.GetVersion())
msBuilder, err = context.loadWorkflowExecution()
if err == nil {
// Workflow execution already exist, looks like a duplicate start event, it is safe to ignore it
r.logger.Infof("Dropping stale replication task for start event. WorkflowID: %v, RunID: %v, Version: %v",
execution.GetWorkflowId(), execution.GetRunId(), request.GetVersion())
return nil
}

default:
var release releaseWorkflowExecutionFunc
context, release, err = r.historyCache.getOrCreateWorkflowExecution(domainID, execution)
if err != nil {
// GetWorkflowExecution failed with some transient error. Return err so we can retry the task later
if _, ok := err.(*shared.EntityNotExistsError); !ok {
return err
}
defer func() { release(retError) }()

// WorkflowExecution does not exist, lets proceed with processing of the task
msBuilder = newMutableStateBuilderWithReplicationState(r.shard.GetConfig(), r.logger, request.GetVersion())

default:
msBuilder, err = context.loadWorkflowExecution()
if err != nil {
return err
Expand Down Expand Up @@ -305,33 +317,34 @@ func (r *historyReplicator) ApplyReplicationTask(context *workflowExecutionConte
// The failover version checking && overwriting should be performed here: #675
// TODO

workflowExistsErrHandler := func(err *persistence.WorkflowExecutionAlreadyStartedError) error {
// set the prev run ID for database conditional update
prevRunID := err.RunID
prevState := err.State
if prevState != persistence.WorkflowStateCompleted {
if prevRunID == execution.GetRunId() {
// this is a duplicate execution of the start execution event
// or this is an execution of the duplicate start execution event
return nil
}
return err
}
// if the existing workflow is completed, ignore the worklow ID reuse policy
// since the policy should be applied by the active cluster,
// standby cluster should apply this event without question.
return nil
}

// try to create the workflow execution
isBrandNew := true
_, err = createWorkflow(isBrandNew, "")
// if err still non nil, see if retry
if errExist, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
if err = workflowExistsErrHandler(errExist); err == nil {
isBrandNew = false
_, err = createWorkflow(isBrandNew, errExist.RunID)
prevRunID := errExist.RunID
prevState := errExist.State

// Check for duplicate processing of StartWorkflowExecution replication task
if prevRunID == execution.GetRunId() {
r.logger.Infof("Dropping stale replication task for start event. WorkflowID: %v, RunID: %v, Version: %v",
execution.GetWorkflowId(), execution.GetRunId(), request.GetVersion())
return nil
}

// Some other workflow is running, for now let's keep on retrying this replication event by an error
// to wait for current run to finish so this event could be applied.
// TODO: We also need to deal with conflict resolution when workflow with same ID is started on 2 different
// clusters.
if prevState != persistence.WorkflowStateCompleted {
return err
}

// if the existing workflow is completed, ignore the worklow ID reuse policy
// since the policy should be applied by the active cluster,
// standby cluster should apply this event without question.
isBrandNew = false
_, err = createWorkflow(isBrandNew, errExist.RunID)
}

default:
Expand Down

0 comments on commit ba13d2b

Please sign in to comment.