Skip to content

Commit

Permalink
Bugfix: rpc call cancel function should not use defer in for loop (#986)
Browse files Browse the repository at this point in the history
* Bugfix: rpc call cancel function should not use defer in for loop
* Add more logs
  • Loading branch information
wxing1292 authored Jul 20, 2018
1 parent c19a5e6 commit 992a81a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 9 deletions.
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
TagIncomingVersion = "incoming-version"
TagFirstEventID = "first-event-id"
TagNextEventID = "next-event-id"
TagResetNextEventID = "reset-next-event-id"
TagTimeoutType = "timeout-type"
TagReplicationInfo = "replication-info"
TagAttemptCount = "attempt-count"
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func (s *TestBase) UpdateWorkflowExecutionForSignal(
})
}

// UpdateWorkflowExecutionForSignal is a utility method to update workflow execution
// UpdateWorkflowExecutionForBufferEvents is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecutionForBufferEvents(
updatedInfo *WorkflowExecutionInfo, rState *ReplicationState, condition int64,
bufferEvents *SerializedHistoryEventBatch) error {
Expand Down
21 changes: 16 additions & 5 deletions service/history/conflictResolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/persistence"
)

Expand Down Expand Up @@ -73,9 +74,10 @@ func (r *conflictResolverImpl) reset(requestID string, replayEventID int64, star
for hasMore := true; hasMore; hasMore = len(nextPageToken) > 0 {
history, nextPageToken, lastFirstEventID, err = r.getHistory(domainID, execution, common.FirstEventID,
replayNextEventID, nextPageToken)
r.logger.Debugf("Conflict Resolver GetHistory. History Length: %v, token: %v, err: %v",
r.logger.Debugf("Conflict Resolver GetHistory. History Length: %v, token: %v, err: %v",
len(history.Events), nextPageToken, err)
if err != nil {
r.logError("Conflict resolution err getting history.", err)
return nil, err
}

Expand Down Expand Up @@ -103,6 +105,7 @@ func (r *conflictResolverImpl) reset(requestID string, replayEventID int64, star

_, _, _, err = sBuilder.applyEvents(domainID, requestID, execution, history, nil)
if err != nil {
r.logError("Conflict resolution err applying events.", err)
return nil, err
}
resetMutableStateBuilder.executionInfo.LastFirstEventID = lastFirstEventID
Expand All @@ -117,10 +120,12 @@ func (r *conflictResolverImpl) reset(requestID string, replayEventID int64, star
sourceCluster := r.clusterMetadata.ClusterNameForFailoverVersion(lastEvent.GetVersion())
resetMutableStateBuilder.UpdateReplicationStateLastEventID(sourceCluster, lastEvent.GetVersion(), replayEventID)

r.logger.Infof("All events applied for execution. WorkflowID: %v, RunID: %v, NextEventID: %v",
execution.GetWorkflowId(), execution.GetRunId(), resetMutableStateBuilder.GetNextEventID())

return r.context.resetWorkflowExecution(resetMutableStateBuilder)
r.logger.WithField(logging.TagResetNextEventID, resetMutableStateBuilder.GetNextEventID()).Info("All events applied for execution.")
msBuilder, err := r.context.resetWorkflowExecution(resetMutableStateBuilder)
if err != nil {
r.logError("Conflict resolution err reset workflow.", err)
}
return msBuilder, err
}

func (r *conflictResolverImpl) getHistory(domainID string, execution shared.WorkflowExecution, firstEventID,
Expand Down Expand Up @@ -158,3 +163,9 @@ func (r *conflictResolverImpl) getHistory(domainID string, execution shared.Work
executionHistory.Events = historyEvents
return executionHistory, response.NextPageToken, lastFirstEventID, nil
}

func (r *conflictResolverImpl) logError(msg string, err error) {
r.logger.WithFields(bark.Fields{
logging.TagErr: err,
}).Error(msg)
}
14 changes: 13 additions & 1 deletion service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,13 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(ctx context

if msBuilder.IsWorkflowExecutionRunning() {
// workflow still running, no continued as new edge case to solve
logger.Info("Conflict resolution workflow running, skip.")
return nil
}

if msBuilder.GetExecutionInfo().CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew {
// workflow close status not being continue as new
logger.Info("Conflict resolution workflow finished not continue as new.")
return nil
}

Expand All @@ -714,6 +716,7 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(ctx context
workflowID := msBuilder.GetExecutionInfo().WorkflowID
_, currentMutableState, currentRelease, err := r.getCurrentWorkflowMutableState(ctx, domainID, workflowID)
if err != nil {
logger.Info("Conflict resolution error getting current workflow.")
return err
}
currentRunID := currentMutableState.GetExecutionInfo().RunID
Expand All @@ -722,6 +725,7 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(ctx context
if currentCloseStatus != persistence.WorkflowCloseStatusNone {
// current workflow finished
// note, it is impassoble that a current workflow ends with continue as new as close status
logger.Info("Conflict resolution current workflow finished.")
return nil
}

Expand All @@ -738,6 +742,7 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(ctx context
NextPageToken: nil,
})
if err != nil {
r.logError(logger, "Conflict resolution current workflow finished.", err)
return "", err
}
if len(response.Events) == 0 {
Expand All @@ -752,10 +757,12 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(ctx context
persistence.SetSerializedHistoryDefaults(&serializedHistoryEventBatch)
serializer, err := persistence.NewHistorySerializerFactory().Get(serializedHistoryEventBatch.EncodingType)
if err != nil {
r.logError(logger, "Conflict resolution error getting serializer.", err)
return "", err
}
history, err := serializer.Deserialize(&serializedHistoryEventBatch)
if err != nil {
r.logError(logger, "Conflict resolution error deserialize events.", err)
return "", err
}
if len(history.Events) == 0 {
Expand All @@ -781,6 +788,7 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(ctx context
}
if runID == "" {
// cannot relate the current running workflow to the workflow which events are being resetted.
logger.Info("Conflict resolution current workflow is not related.")
return nil
}

Expand All @@ -793,7 +801,11 @@ func (r *historyReplicator) conflictResolutionTerminateContinueAsNew(ctx context
// we will retry on the worker level

// same workflow ID, same shard
return r.terminateWorkflow(ctx, domainID, workflowID, currentRunID)
err = r.terminateWorkflow(ctx, domainID, workflowID, currentRunID)
if err != nil {
r.logError(logger, "Conflict resolution err terminating current workflow.", err)
}
return err
}

func (r *historyReplicator) Serialize(history *shared.History) (*persistence.SerializedHistoryEventBatch, error) {
Expand Down
3 changes: 1 addition & 2 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,8 @@ Loop:
RetryLoop:
for i := 0; i < p.config.ReplicatorBufferRetryCount; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err = p.historyClient.ReplicateEvents(ctx, req)
cancel()

// Replication tasks could be slightly out of order for a particular workflow execution
// We first try to apply the events without buffering enabled with a small delay to account for such delays
Expand Down

0 comments on commit 992a81a

Please sign in to comment.