Skip to content

Commit

Permalink
CrossDC bugfixes to replication task generation and conflict resolver (
Browse files Browse the repository at this point in the history
…#799)

Replication information captured for replication task has the wrong
last event id, as it is captured before CloseSession on mutable state.

Update stale replication task detection to use LastWriteEventID instead
of Current version.  Otherwise this results in throwing away tasks when
they could potentially be applied.

Fix broken conflict resolution logic to set mutable state fields like
next event id, lastFirstEventID, etc which were not set as a result of
applying history events and need to be set explicitly.
  • Loading branch information
samarabbas authored May 30, 2018
1 parent ad90819 commit 4ae0147
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 37 deletions.
66 changes: 49 additions & 17 deletions service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package history

import (
"time"

"github.com/pborman/uuid"
"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -50,7 +52,7 @@ func newConflictResolver(shard ShardContext, context *workflowExecutionContext,
}
}

func (r *conflictResolver) reset(replayEventID int64) (*mutableStateBuilder, error) {
func (r *conflictResolver) reset(replayEventID int64, startTime time.Time) (*mutableStateBuilder, error) {
domainID := r.context.domainID
execution := r.context.workflowExecution
replayNextEventID := replayEventID + 1
Expand All @@ -59,34 +61,60 @@ func (r *conflictResolver) reset(replayEventID int64) (*mutableStateBuilder, err
var err error
var resetMutableStateBuilder *mutableStateBuilder
var sBuilder *stateBuilder
var lastFirstEventID int64
eventsToApply := replayNextEventID - common.FirstEventID
requestID := uuid.New()
for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 {
history, nextPageToken, err = r.getHistory(domainID, execution, common.FirstEventID, replayNextEventID,
nextPageToken)
history, nextPageToken, lastFirstEventID, err = r.getHistory(domainID, execution, common.FirstEventID,
replayNextEventID, nextPageToken)
r.logger.Debugf("Conflict Resolver GetHistory. History Length: %v, token: %v, err: %v",
len(history.Events), nextPageToken, err)
if err != nil {
return nil, err
}

for _, event := range history.Events {
if event.GetEventId() == common.FirstEventID {
resetMutableStateBuilder = newMutableStateBuilderWithReplicationState(r.shard.GetConfig(), r.logger,
event.GetVersion())
batchSize := int64(len(history.Events))
// NextEventID could be in the middle of the batch. Trim the history events to not have more events then what
// need to be applied
if batchSize > eventsToApply {
history.Events = history.Events[0:eventsToApply]
}

eventsToApply -= batchSize

if len(history.Events) == 0 {
break
}

firstEvent := history.Events[0]
if firstEvent.GetEventId() == common.FirstEventID {
resetMutableStateBuilder = newMutableStateBuilderWithReplicationState(r.shard.GetConfig(), r.logger,
firstEvent.GetVersion())

sBuilder = newStateBuilder(r.shard, resetMutableStateBuilder, r.logger)
}
sBuilder = newStateBuilder(r.shard, resetMutableStateBuilder, r.logger)
}

_, _, _, err = sBuilder.applyEvents(common.EmptyVersion, "", domainID, requestID, execution, history, nil)
if err != nil {
return nil, err
}
_, _, _, err = sBuilder.applyEvents(common.EmptyVersion, "", domainID, requestID, execution, history, nil)
if err != nil {
return nil, err
}
resetMutableStateBuilder.executionInfo.LastFirstEventID = lastFirstEventID
}

// Applying events to mutableState does not move the nextEventID. Explicitly set nextEventID to new value
resetMutableStateBuilder.executionInfo.NextEventID = replayNextEventID
resetMutableStateBuilder.executionInfo.StartTimestamp = startTime
// the last updated time is not important here, since this should be updated with event time afterwards
resetMutableStateBuilder.executionInfo.LastUpdatedTimestamp = startTime

r.logger.Infof("All events applied for execution. WorkflowID: %v, RunID: %v, NextEventID: %v",
execution.GetWorkflowId(), execution.GetRunId(), resetMutableStateBuilder.GetNextEventID())

return r.context.resetWorkflowExecution(resetMutableStateBuilder)
}

func (r *conflictResolver) getHistory(domainID string, execution shared.WorkflowExecution, firstEventID,
nextEventID int64, nextPageToken []byte) (*shared.History, []byte, error) {
nextEventID int64, nextPageToken []byte) (*shared.History, []byte, int64, error) {

response, err := r.historyMgr.GetWorkflowExecutionHistory(&persistence.GetWorkflowExecutionHistoryRequest{
DomainID: domainID,
Expand All @@ -98,21 +126,25 @@ func (r *conflictResolver) getHistory(domainID string, execution shared.Workflow
})

if err != nil {
return nil, nil, err
return nil, nil, common.EmptyEventID, err
}

lastFirstEventID := common.EmptyEventID
historyEvents := []*shared.HistoryEvent{}
for _, e := range response.Events {
persistence.SetSerializedHistoryDefaults(&e)
s, _ := r.hSerializerFactory.Get(e.EncodingType)
history, err1 := s.Deserialize(&e)
if err1 != nil {
return nil, nil, err1
return nil, nil, common.EmptyEventID, err1
}
if len(history.Events) > 0 {
lastFirstEventID = history.Events[0].GetEventId()
}
historyEvents = append(historyEvents, history.Events...)
}

executionHistory := &shared.History{}
executionHistory.Events = historyEvents
return executionHistory, nextPageToken, nil
return executionHistory, nextPageToken, lastFirstEventID, nil
}
22 changes: 13 additions & 9 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,34 +112,38 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE

rState := msBuilder.replicationState
// Check if this is a stale event
if rState.CurrentVersion > request.GetVersion() {
if rState.LastWriteVersion > request.GetVersion() {
// Replication state is already on a higher version, we can drop this event
// TODO: We need to replay external events like signal to the new version
r.logger.Warnf("Dropping stale replication task. Current Version: %v, Task Version: %v", rState.CurrentVersion,
request.GetVersion())
r.logger.Warnf("Dropping stale replication task. LastWriteV: %v, CurrentV: %v, TaskV: %v",
rState.LastWriteVersion, rState.CurrentVersion, request.GetVersion())
return nil
}

// Check if this is the first event after failover
if rState.LastWriteVersion < request.GetVersion() {
r.logger.Infof("First Event after replication. WorkflowID: %v, RunID: %v, CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v",
execution.GetWorkflowId(), execution.GetRunId(), rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID)
previousActiveCluster := r.metadataMgr.ClusterNameForFailoverVersion(rState.LastWriteVersion)
ri, ok := request.ReplicationInfo[previousActiveCluster]
if !ok {
r.logger.Errorf("No replication information found for previous active cluster. Previous: %v, Current: %v",
previousActiveCluster, request.GetSourceCluster())
r.logger.Errorf("No replication information found for previous active cluster. Previous: %v, Request: %v, ReplicationInfo: %v",
previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo)

// TODO: Handle missing replication information
return nil
}

// Detect conflict
if ri.GetLastEventId() != rState.LastWriteEventID {
r.logger.Infof("Conflict detected. State: {Version: %, LastWriteEventID: %v}, Task: {SourceCluster: %v, Version: %v, LastEventID: %v}",
rState.CurrentVersion, rState.LastWriteEventID, request.GetSourceCluster(), ri.GetVersion(),
ri.GetLastEventId())
r.logger.Infof("Conflict detected. State: {V: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevC: %v, V: %v, LastEvent: %v}, Task: {SourceC: %v, V: %v, First: %v, Next: %v}",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID,
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId(),
request.GetSourceCluster(), request.GetVersion(), request.GetFirstEventId(), request.GetNextEventId())

resolver := newConflictResolver(r.shard, context, r.historyMgr, r.logger)
msBuilder, err = resolver.reset(ri.GetLastEventId())
msBuilder, err = resolver.reset(ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp)
r.logger.Infof("Completed Resetting of workflow execution: Err: %v", err)
if err != nil {
return err
}
Expand Down
25 changes: 14 additions & 11 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,11 @@ func (c *workflowExecutionContext) updateWorkflowExecutionWithDeleteTask(transfe

func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.ReplicateEventsRequest,
transferTasks []persistence.Task, timerTasks []persistence.Task, lastEventID, transactionID int64) error {

nextEventID := lastEventID + 1
c.msBuilder.updateReplicationStateLastEventID(request.GetSourceCluster(), lastEventID)
c.msBuilder.executionInfo.NextEventID = nextEventID

builder := newHistoryBuilderFromEvents(request.History.Events, c.logger)
return c.updateHelper(builder, transferTasks, timerTasks, false, transactionID)
return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), transactionID)
}

func (c *workflowExecutionContext) updateVersion() error {
Expand All @@ -166,17 +164,13 @@ func (c *workflowExecutionContext) updateVersion() error {
func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persistence.Task,
timerTasks []persistence.Task, transactionID int64) error {

crossDCEnabled := c.msBuilder.replicationState != nil
if crossDCEnabled {
lastEventID := c.msBuilder.GetNextEventID() - 1
c.msBuilder.updateReplicationStateLastEventID("", lastEventID)
}

return c.updateHelper(nil, transferTasks, timerTasks, crossDCEnabled, transactionID)
// Only generate replication task if this is a global domain
createReplicationTask := c.msBuilder.replicationState != nil
return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", transactionID)
}

func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task,
timerTasks []persistence.Task, createReplicationTask bool,
timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string,
transactionID int64) (errRet error) {

defer func() {
Expand All @@ -192,6 +186,15 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe
return err
}

// Replication state should only be updated after the UpdateSession is closed. IDs for certain events are only
// generated on CloseSession as they could be buffered events. The value for NextEventID will be wrong on
// mutable state if read before flushing the buffered events.
crossDCEnabled := c.msBuilder.replicationState != nil
if crossDCEnabled {
lastEventID := c.msBuilder.GetNextEventID() - 1
c.msBuilder.updateReplicationStateLastEventID(sourceCluster, lastEventID)
}

// Replicator passes in a custom builder as it already has the events
if builder == nil {
// If no builder is passed in then use the one as part of the updates
Expand Down

0 comments on commit 4ae0147

Please sign in to comment.