Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ut conflict resolver #806

Merged
merged 9 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const (
TagOffset = "offset"
TagScope = "scope"
TagFailover = "failover"
TagVersion = "version"
TagFirstEventID = "first-event-id"
TagNextEventID = "next-event-id"

// workflow logging tag values
// TagWorkflowComponent Values
Expand All @@ -63,6 +66,7 @@ const (
TagValueMatchingEngineComponent = "matching-engine"
TagValueReplicatorComponent = "replicator"
TagValueReplicationTaskProcessorComponent = "replication-task-processor"
TagValueHistoryReplicatorComponent = "history-replicator"

// TagHistoryBuilderAction values
TagValueActionWorkflowStarted = "add-workflowexecution-started-event"
Expand Down
4 changes: 3 additions & 1 deletion service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (r *conflictResolver) reset(requestID string, replayEventID int64, startTim
var resetMutableStateBuilder *mutableStateBuilder
var sBuilder *stateBuilder
var lastFirstEventID int64
var lastEvent *shared.HistoryEvent
eventsToApply := replayNextEventID - common.FirstEventID
for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 {
history, nextPageToken, lastFirstEventID, err = r.getHistory(domainID, execution, common.FirstEventID,
Expand All @@ -88,6 +89,7 @@ func (r *conflictResolver) reset(requestID string, replayEventID int64, startTim
}

firstEvent := history.Events[0]
lastEvent = history.Events[len(history.Events)-1]
if firstEvent.GetEventId() == common.FirstEventID {
resetMutableStateBuilder = newMutableStateBuilderWithReplicationState(r.shard.GetConfig(), r.logger,
firstEvent.GetVersion())
Expand All @@ -109,7 +111,7 @@ func (r *conflictResolver) reset(requestID string, replayEventID int64, startTim
resetMutableStateBuilder.executionInfo.LastUpdatedTimestamp = startTime

sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(resetMutableStateBuilder.GetCurrentVersion())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should read the version from last event rather than rely on CurrentVersion.

resetMutableStateBuilder.updateReplicationStateLastEventID(sourceCluster, replayEventID)
resetMutableStateBuilder.updateReplicationStateLastEventID(sourceCluster, lastEvent.GetVersion(), replayEventID)

r.logger.Infof("All events applied for execution. WorkflowID: %v, RunID: %v, NextEventID: %v",
execution.GetWorkflowId(), execution.GetRunId(), resetMutableStateBuilder.GetNextEventID())
Expand Down
45 changes: 33 additions & 12 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newHistoryReplicator(shard ShardContext, historyEngine *historyEngineImpl,
historyMgr: historyMgr,
historySerializer: persistence.NewJSONHistorySerializer(),
metadataMgr: shard.GetService().GetClusterMetadata(),
logger: logger,
logger: logger.WithField(logging.TagWorkflowComponent, logging.TagValueHistoryReplicatorComponent),
}

return replicator
Expand All @@ -84,15 +84,22 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE
}
defer func() { release(retError) }()

logger := r.logger.WithFields(bark.Fields{
logging.TagWorkflowExecutionID: execution.GetWorkflowId(),
logging.TagWorkflowRunID: execution.GetRunId(),
logging.TagSourceCluster: request.GetSourceCluster(),
logging.TagVersion: request.GetVersion(),
logging.TagFirstEventID: request.GetFirstEventId(),
logging.TagNextEventID: request.GetNextEventId(),
})
var msBuilder *mutableStateBuilder
firstEvent := request.History.Events[0]
switch firstEvent.GetEventType() {
case shared.EventTypeWorkflowExecutionStarted:
msBuilder, err = context.loadWorkflowExecution()
if err == nil {
// Workflow execution already exist, looks like a duplicate start event, it is safe to ignore it
r.logger.Infof("Dropping stale replication task for start event. WorkflowID: %v, RunID: %v, Version: %v",
execution.GetWorkflowId(), execution.GetRunId(), request.GetVersion())
logger.Info("Dropping stale replication task for start event.")
return nil
}

Expand All @@ -115,19 +122,19 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE
if rState.LastWriteVersion > request.GetVersion() {
// Replication state is already on a higher version, we can drop this event
// TODO: We need to replay external events like signal to the new version
r.logger.Warnf("Dropping stale replication task. LastWriteV: %v, CurrentV: %v, TaskV: %v",
rState.LastWriteVersion, rState.CurrentVersion, request.GetVersion())
logger.Warnf("Dropping stale replication task. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID)
return nil
}

// Check if this is the first event after failover
if rState.LastWriteVersion < request.GetVersion() {
r.logger.Infof("First Event after replication. WorkflowID: %v, RunID: %v, CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v",
execution.GetWorkflowId(), execution.GetRunId(), rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID)
logger.Infof("First Event after replication. CurrentV: %v, LastWriteV: %v, LastWriteEvent: %v",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID)
previousActiveCluster := r.metadataMgr.ClusterNameForFailoverVersion(rState.LastWriteVersion)
ri, ok := request.ReplicationInfo[previousActiveCluster]
if !ok {
r.logger.Errorf("No replication information found for previous active cluster. Previous: %v, Request: %v, ReplicationInfo: %v",
logger.Errorf("No replication information found for previous active cluster. Previous: %v, Request: %v, ReplicationInfo: %v",
previousActiveCluster, request.GetSourceCluster(), request.ReplicationInfo)

// TODO: Handle missing replication information
Expand All @@ -136,14 +143,13 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE

// Detect conflict
if ri.GetLastEventId() != rState.LastWriteEventID {
r.logger.Infof("Conflict detected. State: {V: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevC: %v, V: %v, LastEvent: %v}, Task: {SourceC: %v, V: %v, First: %v, Next: %v}",
logger.Infof("Conflict detected. State: {V: %v, LastWriteV: %v, LastWriteEvent: %v}, ReplicationInfo: {PrevC: %v, V: %v, LastEvent: %v}",
rState.CurrentVersion, rState.LastWriteVersion, rState.LastWriteEventID,
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId(),
request.GetSourceCluster(), request.GetVersion(), request.GetFirstEventId(), request.GetNextEventId())
previousActiveCluster, ri.GetVersion(), ri.GetLastEventId())

resolver := newConflictResolver(r.shard, context, r.historyMgr, r.logger)
msBuilder, err = resolver.reset(uuid.New(), ri.GetLastEventId(), msBuilder.executionInfo.StartTimestamp)
r.logger.Infof("Completed Resetting of workflow execution: Err: %v", err)
logger.Infof("Completed Resetting of workflow execution. NextEventID:%v. Err: %v", msBuilder.GetNextEventID(), err)
if err != nil {
return err
}
Expand All @@ -152,12 +158,19 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE

// Check for duplicate processing of replication task
if firstEvent.GetEventId() < msBuilder.GetNextEventID() {
logger.Warnf("Dropping replication task. State: {NextEvent: %v, Version: %v, LastWriteV: %v, LastWriteEvent: %v}",
msBuilder.GetNextEventID(), msBuilder.replicationState.CurrentVersion,
msBuilder.replicationState.LastWriteVersion, msBuilder.replicationState.LastWriteEventID)
return nil
}

// Check for out of order replication task and store it in the buffer
if firstEvent.GetEventId() > msBuilder.GetNextEventID() {
logger.Infof("Buffer out of order replication task. NextEvent: %v, FirstEvent: %v",
msBuilder.GetNextEventID(), firstEvent.GetEventId())

if err := msBuilder.BufferReplicationTask(request); err != nil {
logger.Errorf("Failed to buffer out of order replication task. Err: %v", err)
return errors.New("failed to add buffered replication task")
}

Expand All @@ -168,17 +181,25 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) (retE
// First check if there are events which needs to be flushed before applying the update
err = r.FlushBuffer(context, msBuilder, request)
if err != nil {
logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(),
firstEvent.GetEventId(), err)
return err
}

// Apply the replication task
err = r.ApplyReplicationTask(context, msBuilder, request)
if err != nil {
logger.Errorf("Fail to Apply Replication task. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(),
firstEvent.GetEventId(), err)
return err
}

// Flush buffered replication tasks after applying the update
err = r.FlushBuffer(context, msBuilder, request)
if err != nil {
logger.Errorf("Fail to flush buffer. NextEvent: %v, FirstEvent: %v, Err: %v", msBuilder.GetNextEventID(),
firstEvent.GetEventId(), err)
}

return err
}
Expand Down
6 changes: 4 additions & 2 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,9 @@ func (e *mutableStateBuilder) updateReplicationStateVersion(version int64) {

// Assumption: It is expected CurrentVersion on replication state is updated at the start of transaction when
// mutableState is loaded for this workflow execution.
func (e *mutableStateBuilder) updateReplicationStateLastEventID(clusterName string, lastEventID int64) {
func (e *mutableStateBuilder) updateReplicationStateLastEventID(clusterName string, lastWriteVersion,
lastEventID int64) {
e.replicationState.LastWriteVersion = lastWriteVersion
e.replicationState.LastWriteVersion = e.replicationState.CurrentVersion
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line should be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the active workflow, when calling this function, should use the current version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

// TODO: Rename this to NextEventID to stay consistent naming convention with rest of code base
e.replicationState.LastWriteEventID = lastEventID
Expand Down Expand Up @@ -2137,7 +2139,7 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent(sour
}

if newStateBuilder.replicationState != nil {
newStateBuilder.updateReplicationStateLastEventID(sourceClusterName, di.ScheduleID)
newStateBuilder.updateReplicationStateLastEventID(sourceClusterName, startedEvent.GetVersion(), di.ScheduleID)
}

newTransferTasks := []persistence.Task{&persistence.DecisionTask{
Expand Down
6 changes: 2 additions & 4 deletions service/history/stateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,14 @@ func newStateBuilder(shard ShardContext, msBuilder *mutableStateBuilder, logger
}
}

func (b *stateBuilder) applyEvents(domainID, requestID string, execution shared.WorkflowExecution,
history *shared.History, newRunHistory *shared.History) (*shared.HistoryEvent,
func (b *stateBuilder) applyEvents(domainID, requestID string,
execution shared.WorkflowExecution, history *shared.History, newRunHistory *shared.History) (*shared.HistoryEvent,
*decisionInfo, *mutableStateBuilder, error) {
var lastEvent *shared.HistoryEvent
var lastDecision *decisionInfo
var newRunStateBuilder *mutableStateBuilder
for _, event := range history.Events {
lastEvent = event
// must set the current version, since this is standby here, not active
b.msBuilder.updateReplicationStateVersion(event.GetVersion())
switch event.GetEventType() {
case shared.EventTypeWorkflowExecutionStarted:
attributes := event.WorkflowExecutionStartedEventAttributes
Expand Down
11 changes: 7 additions & 4 deletions service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func (c *workflowExecutionContext) replicateWorkflowExecution(request *h.Replica
c.msBuilder.executionInfo.NextEventID = nextEventID

builder := newHistoryBuilderFromEvents(request.History.Events, c.logger)
return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), transactionID)
return c.updateHelper(builder, transferTasks, timerTasks, false, request.GetSourceCluster(), request.GetVersion(),
transactionID)
}

func (c *workflowExecutionContext) updateVersion() error {
Expand All @@ -166,11 +167,13 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi

// Only generate replication task if this is a global domain
createReplicationTask := c.msBuilder.replicationState != nil
return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", transactionID)

lastWriteVersion := c.msBuilder.GetCurrentVersion()
return c.updateHelper(nil, transferTasks, timerTasks, createReplicationTask, "", lastWriteVersion, transactionID)
}

func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transferTasks []persistence.Task,
timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string,
timerTasks []persistence.Task, createReplicationTask bool, sourceCluster string, lastWriteVersion int64,
transactionID int64) (errRet error) {

defer func() {
Expand All @@ -192,7 +195,7 @@ func (c *workflowExecutionContext) updateHelper(builder *historyBuilder, transfe
crossDCEnabled := c.msBuilder.replicationState != nil
if crossDCEnabled {
lastEventID := c.msBuilder.GetNextEventID() - 1
c.msBuilder.updateReplicationStateLastEventID(sourceCluster, lastEventID)
c.msBuilder.updateReplicationStateLastEventID(sourceCluster, lastWriteVersion, lastEventID)
}

// Replicator passes in a custom builder as it already has the events
Expand Down