Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: parent workflow, when signaling child workflow, can experienc… #607

Merged
merged 2 commits into from
Mar 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions service/history/historyCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package history

import (
"sync/atomic"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
Expand All @@ -44,6 +46,11 @@ type (
}
)

const (
cacheNotReleased int32 = 0
cacheReleased int32 = 1
)

var (
// ErrTryLock is a temporary error that is thrown by the API
// when it loses the race to create workflow execution context
Expand Down Expand Up @@ -107,9 +114,12 @@ func (c *historyCache) getOrCreateWorkflowExecution(domainID string,

// This will create a closure on every request.
// Consider revisiting this if it causes too much GC activity
status := cacheNotReleased
releaseFunc := func() {
context.Unlock()
c.Release(key)
if atomic.CompareAndSwapInt32(&status, cacheNotReleased, cacheReleased) {
context.Unlock()
c.Release(key)
}
}

context.Lock()
Expand Down
131 changes: 90 additions & 41 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,26 +328,28 @@ func (t *transferQueueProcessorImpl) processActivityTask(task *persistence.Trans
if err != nil {
return err
}
defer release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you immediate call 'defer release()' after acquiring the release object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there can be cases when cache is full and context, release being nil, while err not being nil, doing defer release() before error checking can lead to null pointer issue


var mb *mutableStateBuilder
mb, err = context.loadWorkflowExecution()
var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
timeout := int32(0)
if err != nil {
release()
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err
}

if ai, found := mb.GetActivityInfo(task.ScheduleID); found {
if ai, found := msBuilder.GetActivityInfo(task.ScheduleID); found {
timeout = ai.ScheduleToStartTimeout
} else {
logging.LogDuplicateTransferTaskEvent(t.logger, persistence.TransferTaskTypeActivityTask, task.TaskID, task.ScheduleID)
}
release()

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
if timeout != 0 {
err = t.matchingClient.AddActivityTask(nil, &m.AddActivityTaskRequest{
DomainUUID: common.StringPtr(targetDomainID),
Expand Down Expand Up @@ -382,27 +384,30 @@ func (t *transferQueueProcessorImpl) processDecisionTask(task *persistence.Trans
if err != nil {
return err
}
defer release()

var mb *mutableStateBuilder
mb, err = context.loadWorkflowExecution()
var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
if err != nil {
release()
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err
}
timeout := mb.executionInfo.WorkflowTimeout
wfTypeName := mb.executionInfo.WorkflowTypeName
startTimestamp := mb.executionInfo.StartTimestamp
if mb.isStickyTaskListEnabled() {
taskList.Name = common.StringPtr(mb.executionInfo.StickyTaskList)

timeout := msBuilder.executionInfo.WorkflowTimeout
wfTypeName := msBuilder.executionInfo.WorkflowTypeName
startTimestamp := msBuilder.executionInfo.StartTimestamp
if msBuilder.isStickyTaskListEnabled() {
taskList.Name = common.StringPtr(msBuilder.executionInfo.StickyTaskList)
taskList.Kind = common.TaskListKindPtr(workflow.TaskListKindSticky)
timeout = mb.executionInfo.StickyScheduleToStartTimeout
timeout = msBuilder.executionInfo.StickyScheduleToStartTimeout
}
release()

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
err = t.matchingClient.AddDecisionTask(nil, &m.AddDecisionTaskRequest{
DomainUUID: common.StringPtr(domainID),
Execution: &execution,
Expand Down Expand Up @@ -444,8 +449,8 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
}
defer release()

var mb *mutableStateBuilder
mb, err = context.loadWorkflowExecution()
var msBuilder *mutableStateBuilder
msBuilder, err = context.loadWorkflowExecution()
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, but the mutable state was
Expand All @@ -455,16 +460,34 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
return err
}

replyToParentWorkflow := msBuilder.hasParentExecution() && msBuilder.executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew
var completionEvent *workflow.HistoryEvent
if replyToParentWorkflow {
completionEvent, _ = msBuilder.GetCompletionEvent()
}
parentDomainID := msBuilder.executionInfo.ParentDomainID
parentWorkflowID := msBuilder.executionInfo.ParentWorkflowID
parentRunID := msBuilder.executionInfo.ParentRunID
initiatedID := msBuilder.executionInfo.InitiatedID

workflowTypeName := msBuilder.executionInfo.WorkflowTypeName
workflowStartTimestamp := msBuilder.executionInfo.StartTimestamp.UnixNano()
workflowCloseTimestamp := msBuilder.getLastUpdatedTimestamp()
workflowCloseStatus := getWorkflowExecutionCloseStatus(msBuilder.executionInfo.CloseStatus)
workflowHistoryLength := msBuilder.GetNextEventID()

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
// Communicate the result to parent execution if this is Child Workflow execution
if mb.hasParentExecution() && mb.executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew {
completionEvent, _ := mb.GetCompletionEvent()
if replyToParentWorkflow {
err = t.historyClient.RecordChildExecutionCompleted(nil, &history.RecordChildExecutionCompletedRequest{
DomainUUID: common.StringPtr(mb.executionInfo.ParentDomainID),
DomainUUID: common.StringPtr(parentDomainID),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(mb.executionInfo.ParentWorkflowID),
RunId: common.StringPtr(mb.executionInfo.ParentRunID),
WorkflowId: common.StringPtr(parentWorkflowID),
RunId: common.StringPtr(parentRunID),
},
InitiatedId: common.Int64Ptr(mb.executionInfo.InitiatedID),
InitiatedId: common.Int64Ptr(initiatedID),
CompletedExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
Expand All @@ -476,11 +499,10 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
switch err.(type) {
case *workflow.EntityNotExistsError:
err = nil
default:
return err
}
}
if err != nil {
return err
}

// Record closing in visibility store
retentionSeconds := int64(0)
Expand All @@ -498,11 +520,11 @@ func (t *transferQueueProcessorImpl) processCloseExecution(task *persistence.Tra
return t.visibilityManager.RecordWorkflowExecutionClosed(&persistence.RecordWorkflowExecutionClosedRequest{
DomainUUID: task.DomainID,
Execution: execution,
WorkflowTypeName: mb.executionInfo.WorkflowTypeName,
StartTimestamp: mb.executionInfo.StartTimestamp.UnixNano(),
CloseTimestamp: mb.getLastUpdatedTimestamp(),
Status: getWorkflowExecutionCloseStatus(mb.executionInfo.CloseStatus),
HistoryLength: mb.GetNextEventID(),
WorkflowTypeName: workflowTypeName,
StartTimestamp: workflowStartTimestamp,
CloseTimestamp: workflowCloseTimestamp,
Status: workflowCloseStatus,
HistoryLength: workflowHistoryLength,
RetentionSeconds: retentionSeconds,
})
}
Expand Down Expand Up @@ -545,6 +567,29 @@ func (t *transferQueueProcessorImpl) processCancelExecution(task *persistence.Tr
// No pending request cancellation for this initiatedID, complete this transfer task
return nil
}

// handle workflow cancel itself
if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
cancelRequest := &history.RequestCancelWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
CancelRequest: &workflow.RequestCancelWorkflowExecutionRequest{
Domain: common.StringPtr(targetDomainID),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.TargetWorkflowID),
RunId: common.StringPtr(task.TargetRunID),
},
Identity: common.StringPtr(identityHistoryService),
},
}
err = t.requestCancelFailed(task, context, cancelRequest)
if _, ok := err.(*workflow.EntityNotExistsError); ok {
// this could happen if this is a duplicate processing of the task, and the execution has already completed.
return nil
}
return err
}

cancelRequest := &history.RequestCancelWorkflowExecutionRequest{
DomainUUID: common.StringPtr(targetDomainID),
CancelRequest: &workflow.RequestCancelWorkflowExecutionRequest{
Expand Down Expand Up @@ -643,13 +688,14 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr

// handle workflow signal itself
if domainID == targetDomainID && task.WorkflowID == task.TargetWorkflowID {
// it does not matter if the run ID is a mismatch
signalRequest := &history.SignalWorkflowExecutionRequest{
DomainUUID: common.StringPtr(domainID),
DomainUUID: common.StringPtr(targetDomainID),
SignalRequest: &workflow.SignalWorkflowExecutionRequest{
Domain: common.StringPtr(domainID),
Domain: common.StringPtr(targetDomainID),
WorkflowExecution: &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
WorkflowId: common.StringPtr(task.TargetWorkflowID),
RunId: common.StringPtr(task.TargetRunID),
},
Identity: common.StringPtr(identityHistoryService),
Control: ri.Control,
Expand Down Expand Up @@ -711,6 +757,9 @@ func (t *transferQueueProcessorImpl) processSignalExecution(task *persistence.Tr
return nil
}

// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release()
// remove signalRequestedID from target workflow, after Signal detail is removed from source workflow
removeRequest := &history.RemoveSignalMutableStateRequest{
DomainUUID: common.StringPtr(targetDomainID),
Expand Down Expand Up @@ -929,9 +978,9 @@ func (t *transferQueueProcessorImpl) requestCancelCompleted(task *persistence.Tr

msBuilder.AddExternalWorkflowExecutionCancelRequested(
initiatedEventID,
*request.DomainUUID,
*request.CancelRequest.WorkflowExecution.WorkflowId,
common.StringDefault(request.CancelRequest.WorkflowExecution.RunId),
request.GetDomainUUID(),
request.CancelRequest.WorkflowExecution.GetWorkflowId(),
request.CancelRequest.WorkflowExecution.GetRunId(),
)

return nil
Expand Down Expand Up @@ -983,9 +1032,9 @@ func (t *transferQueueProcessorImpl) requestCancelFailed(task *persistence.Trans
msBuilder.AddRequestCancelExternalWorkflowExecutionFailedEvent(
emptyEventID,
initiatedEventID,
*request.DomainUUID,
*request.CancelRequest.WorkflowExecution.WorkflowId,
common.StringDefault(request.CancelRequest.WorkflowExecution.RunId),
request.GetDomainUUID(),
request.CancelRequest.WorkflowExecution.GetWorkflowId(),
request.CancelRequest.WorkflowExecution.GetRunId(),
workflow.CancelExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution)

return nil
Expand Down