From b98524b58989c268d706408944f7cbb9427eec3a Mon Sep 17 00:00:00 2001 From: wxing1292 Date: Fri, 12 Jan 2018 15:05:08 -0800 Subject: [PATCH] add handling of child workflow ID reuse logic (#500) --- .gen/go/shared/idl.go | 2 +- .gen/go/shared/types.go | 150 ++++++++++++++++------ idl/github.com/uber/cadence/shared.thrift | 2 + service/history/historyBuilder.go | 1 + service/history/transferQueueProcessor.go | 3 +- 5 files changed, 117 insertions(+), 41 deletions(-) diff --git a/.gen/go/shared/idl.go b/.gen/go/shared/idl.go index e65f6c8a189..906468c0f32 100644 --- a/.gen/go/shared/idl.go +++ b/.gen/go/shared/idl.go @@ -30,7 +30,7 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "shared", Package: "github.com/uber/cadence/.gen/go/shared", FilePath: "shared.thrift", - SHA1: "c066d327e24384e16775fb9fccd5eb435f382a02", + SHA1: "c8ed90fd99be82b9358829f0ec4d84d259e68021", Raw: rawIDL, } diff --git a/.gen/go/shared/types.go b/.gen/go/shared/types.go index 63e4dadae78..bfad880bbe6 100644 --- a/.gen/go/shared/types.go +++ b/.gen/go/shared/types.go @@ -19525,15 +19525,16 @@ func (v *SignalWorkflowExecutionRequest) GetIdentity() (o string) { } type StartChildWorkflowExecutionDecisionAttributes struct { - Domain *string `json:"domain,omitempty"` - WorkflowId *string `json:"workflowId,omitempty"` - WorkflowType *WorkflowType `json:"workflowType,omitempty"` - TaskList *TaskList `json:"taskList,omitempty"` - Input []byte `json:"input,omitempty"` - ExecutionStartToCloseTimeoutSeconds *int32 `json:"executionStartToCloseTimeoutSeconds,omitempty"` - TaskStartToCloseTimeoutSeconds *int32 `json:"taskStartToCloseTimeoutSeconds,omitempty"` - ChildPolicy *ChildPolicy `json:"childPolicy,omitempty"` - Control []byte `json:"control,omitempty"` + Domain *string `json:"domain,omitempty"` + WorkflowId *string `json:"workflowId,omitempty"` + WorkflowType *WorkflowType `json:"workflowType,omitempty"` + TaskList *TaskList `json:"taskList,omitempty"` + Input []byte `json:"input,omitempty"` + ExecutionStartToCloseTimeoutSeconds *int32 `json:"executionStartToCloseTimeoutSeconds,omitempty"` + TaskStartToCloseTimeoutSeconds *int32 `json:"taskStartToCloseTimeoutSeconds,omitempty"` + ChildPolicy *ChildPolicy `json:"childPolicy,omitempty"` + Control []byte `json:"control,omitempty"` + WorkflowIdReusePolicy *WorkflowIdReusePolicy `json:"workflowIdReusePolicy,omitempty"` } // ToWire translates a StartChildWorkflowExecutionDecisionAttributes struct into a Thrift-level intermediate @@ -19553,7 +19554,7 @@ type StartChildWorkflowExecutionDecisionAttributes struct { // } func (v *StartChildWorkflowExecutionDecisionAttributes) ToWire() (wire.Value, error) { var ( - fields [9]wire.Field + fields [10]wire.Field i int = 0 w wire.Value err error @@ -19631,6 +19632,14 @@ func (v *StartChildWorkflowExecutionDecisionAttributes) ToWire() (wire.Value, er fields[i] = wire.Field{ID: 90, Value: w} i++ } + if v.WorkflowIdReusePolicy != nil { + w, err = v.WorkflowIdReusePolicy.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 100, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } @@ -19641,6 +19650,12 @@ func _ChildPolicy_Read(w wire.Value) (ChildPolicy, error) { return v, err } +func _WorkflowIdReusePolicy_Read(w wire.Value) (WorkflowIdReusePolicy, error) { + var v WorkflowIdReusePolicy + err := v.FromWire(w) + return v, err +} + // FromWire deserializes a StartChildWorkflowExecutionDecisionAttributes struct from its Thrift-level // representation. The Thrift-level representation may be obtained // from a ThriftRW protocol implementation. @@ -19744,6 +19759,16 @@ func (v *StartChildWorkflowExecutionDecisionAttributes) FromWire(w wire.Value) e return err } + } + case 100: + if field.Value.Type() == wire.TI32 { + var x WorkflowIdReusePolicy + x, err = _WorkflowIdReusePolicy_Read(field.Value) + v.WorkflowIdReusePolicy = &x + if err != nil { + return err + } + } } } @@ -19758,7 +19783,7 @@ func (v *StartChildWorkflowExecutionDecisionAttributes) String() string { return "" } - var fields [9]string + var fields [10]string i := 0 if v.Domain != nil { fields[i] = fmt.Sprintf("Domain: %v", *(v.Domain)) @@ -19796,6 +19821,10 @@ func (v *StartChildWorkflowExecutionDecisionAttributes) String() string { fields[i] = fmt.Sprintf("Control: %v", v.Control) i++ } + if v.WorkflowIdReusePolicy != nil { + fields[i] = fmt.Sprintf("WorkflowIdReusePolicy: %v", *(v.WorkflowIdReusePolicy)) + i++ + } return fmt.Sprintf("StartChildWorkflowExecutionDecisionAttributes{%v}", strings.Join(fields[:i], ", ")) } @@ -19810,6 +19839,16 @@ func _ChildPolicy_EqualsPtr(lhs, rhs *ChildPolicy) bool { return lhs == nil && rhs == nil } +func _WorkflowIdReusePolicy_EqualsPtr(lhs, rhs *WorkflowIdReusePolicy) bool { + if lhs != nil && rhs != nil { + + x := *lhs + y := *rhs + return x.Equals(y) + } + return lhs == nil && rhs == nil +} + // Equals returns true if all the fields of this StartChildWorkflowExecutionDecisionAttributes match the // provided StartChildWorkflowExecutionDecisionAttributes. // @@ -19842,6 +19881,9 @@ func (v *StartChildWorkflowExecutionDecisionAttributes) Equals(rhs *StartChildWo if !((v.Control == nil && rhs.Control == nil) || (v.Control != nil && rhs.Control != nil && bytes.Equal(v.Control, rhs.Control))) { return false } + if !_WorkflowIdReusePolicy_EqualsPtr(v.WorkflowIdReusePolicy, rhs.WorkflowIdReusePolicy) { + return false + } return true } @@ -19896,6 +19938,16 @@ func (v *StartChildWorkflowExecutionDecisionAttributes) GetChildPolicy() (o Chil return } +// GetWorkflowIdReusePolicy returns the value of WorkflowIdReusePolicy if it is set or its +// zero value if it is unset. +func (v *StartChildWorkflowExecutionDecisionAttributes) GetWorkflowIdReusePolicy() (o WorkflowIdReusePolicy) { + if v.WorkflowIdReusePolicy != nil { + return *v.WorkflowIdReusePolicy + } + + return +} + type StartChildWorkflowExecutionFailedEventAttributes struct { Domain *string `json:"domain,omitempty"` WorkflowId *string `json:"workflowId,omitempty"` @@ -20221,16 +20273,17 @@ func (v *StartChildWorkflowExecutionFailedEventAttributes) GetDecisionTaskComple } type StartChildWorkflowExecutionInitiatedEventAttributes struct { - Domain *string `json:"domain,omitempty"` - WorkflowId *string `json:"workflowId,omitempty"` - WorkflowType *WorkflowType `json:"workflowType,omitempty"` - TaskList *TaskList `json:"taskList,omitempty"` - Input []byte `json:"input,omitempty"` - ExecutionStartToCloseTimeoutSeconds *int32 `json:"executionStartToCloseTimeoutSeconds,omitempty"` - TaskStartToCloseTimeoutSeconds *int32 `json:"taskStartToCloseTimeoutSeconds,omitempty"` - ChildPolicy *ChildPolicy `json:"childPolicy,omitempty"` - Control []byte `json:"control,omitempty"` - DecisionTaskCompletedEventId *int64 `json:"decisionTaskCompletedEventId,omitempty"` + Domain *string `json:"domain,omitempty"` + WorkflowId *string `json:"workflowId,omitempty"` + WorkflowType *WorkflowType `json:"workflowType,omitempty"` + TaskList *TaskList `json:"taskList,omitempty"` + Input []byte `json:"input,omitempty"` + ExecutionStartToCloseTimeoutSeconds *int32 `json:"executionStartToCloseTimeoutSeconds,omitempty"` + TaskStartToCloseTimeoutSeconds *int32 `json:"taskStartToCloseTimeoutSeconds,omitempty"` + ChildPolicy *ChildPolicy `json:"childPolicy,omitempty"` + Control []byte `json:"control,omitempty"` + DecisionTaskCompletedEventId *int64 `json:"decisionTaskCompletedEventId,omitempty"` + WorkflowIdReusePolicy *WorkflowIdReusePolicy `json:"workflowIdReusePolicy,omitempty"` } // ToWire translates a StartChildWorkflowExecutionInitiatedEventAttributes struct into a Thrift-level intermediate @@ -20250,7 +20303,7 @@ type StartChildWorkflowExecutionInitiatedEventAttributes struct { // } func (v *StartChildWorkflowExecutionInitiatedEventAttributes) ToWire() (wire.Value, error) { var ( - fields [10]wire.Field + fields [11]wire.Field i int = 0 w wire.Value err error @@ -20336,6 +20389,14 @@ func (v *StartChildWorkflowExecutionInitiatedEventAttributes) ToWire() (wire.Val fields[i] = wire.Field{ID: 100, Value: w} i++ } + if v.WorkflowIdReusePolicy != nil { + w, err = v.WorkflowIdReusePolicy.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 110, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } @@ -20453,6 +20514,16 @@ func (v *StartChildWorkflowExecutionInitiatedEventAttributes) FromWire(w wire.Va return err } + } + case 110: + if field.Value.Type() == wire.TI32 { + var x WorkflowIdReusePolicy + x, err = _WorkflowIdReusePolicy_Read(field.Value) + v.WorkflowIdReusePolicy = &x + if err != nil { + return err + } + } } } @@ -20467,7 +20538,7 @@ func (v *StartChildWorkflowExecutionInitiatedEventAttributes) String() string { return "" } - var fields [10]string + var fields [11]string i := 0 if v.Domain != nil { fields[i] = fmt.Sprintf("Domain: %v", *(v.Domain)) @@ -20509,6 +20580,10 @@ func (v *StartChildWorkflowExecutionInitiatedEventAttributes) String() string { fields[i] = fmt.Sprintf("DecisionTaskCompletedEventId: %v", *(v.DecisionTaskCompletedEventId)) i++ } + if v.WorkflowIdReusePolicy != nil { + fields[i] = fmt.Sprintf("WorkflowIdReusePolicy: %v", *(v.WorkflowIdReusePolicy)) + i++ + } return fmt.Sprintf("StartChildWorkflowExecutionInitiatedEventAttributes{%v}", strings.Join(fields[:i], ", ")) } @@ -20548,6 +20623,9 @@ func (v *StartChildWorkflowExecutionInitiatedEventAttributes) Equals(rhs *StartC if !_I64_EqualsPtr(v.DecisionTaskCompletedEventId, rhs.DecisionTaskCompletedEventId) { return false } + if !_WorkflowIdReusePolicy_EqualsPtr(v.WorkflowIdReusePolicy, rhs.WorkflowIdReusePolicy) { + return false + } return true } @@ -20612,6 +20690,16 @@ func (v *StartChildWorkflowExecutionInitiatedEventAttributes) GetDecisionTaskCom return } +// GetWorkflowIdReusePolicy returns the value of WorkflowIdReusePolicy if it is set or its +// zero value if it is unset. +func (v *StartChildWorkflowExecutionInitiatedEventAttributes) GetWorkflowIdReusePolicy() (o WorkflowIdReusePolicy) { + if v.WorkflowIdReusePolicy != nil { + return *v.WorkflowIdReusePolicy + } + + return +} + type StartTimeFilter struct { EarliestTime *int64 `json:"earliestTime,omitempty"` LatestTime *int64 `json:"latestTime,omitempty"` @@ -21036,12 +21124,6 @@ func (v *StartWorkflowExecutionRequest) ToWire() (wire.Value, error) { return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } -func _WorkflowIdReusePolicy_Read(w wire.Value) (WorkflowIdReusePolicy, error) { - var v WorkflowIdReusePolicy - err := v.FromWire(w) - return v, err -} - // FromWire deserializes a StartWorkflowExecutionRequest struct from its Thrift-level // representation. The Thrift-level representation may be obtained // from a ThriftRW protocol implementation. @@ -21217,16 +21299,6 @@ func (v *StartWorkflowExecutionRequest) String() string { return fmt.Sprintf("StartWorkflowExecutionRequest{%v}", strings.Join(fields[:i], ", ")) } -func _WorkflowIdReusePolicy_EqualsPtr(lhs, rhs *WorkflowIdReusePolicy) bool { - if lhs != nil && rhs != nil { - - x := *lhs - y := *rhs - return x.Equals(y) - } - return lhs == nil && rhs == nil -} - // Equals returns true if all the fields of this StartWorkflowExecutionRequest match the // provided StartWorkflowExecutionRequest. // diff --git a/idl/github.com/uber/cadence/shared.thrift b/idl/github.com/uber/cadence/shared.thrift index 3b297c31be4..24b7b875d80 100644 --- a/idl/github.com/uber/cadence/shared.thrift +++ b/idl/github.com/uber/cadence/shared.thrift @@ -306,6 +306,7 @@ struct StartChildWorkflowExecutionDecisionAttributes { 70: optional i32 taskStartToCloseTimeoutSeconds 80: optional ChildPolicy childPolicy 90: optional binary control + 100: optional WorkflowIdReusePolicy workflowIdReusePolicy } struct Decision { @@ -538,6 +539,7 @@ struct StartChildWorkflowExecutionInitiatedEventAttributes { 80: optional ChildPolicy childPolicy 90: optional binary control 100: optional i64 (js.type = "Long") decisionTaskCompletedEventId + 110: optional WorkflowIdReusePolicy workflowIdReusePolicy } struct StartChildWorkflowExecutionFailedEventAttributes { diff --git a/service/history/historyBuilder.go b/service/history/historyBuilder.go index fa9ca030c29..b0b61469546 100644 --- a/service/history/historyBuilder.go +++ b/service/history/historyBuilder.go @@ -707,6 +707,7 @@ func (b *historyBuilder) newStartChildWorkflowExecutionInitiatedEvent(decisionTa attributes.ChildPolicy = common.ChildPolicyPtr(*startAttributes.ChildPolicy) attributes.Control = startAttributes.Control attributes.DecisionTaskCompletedEventId = common.Int64Ptr(decisionTaskCompletedEventID) + attributes.WorkflowIdReusePolicy = startAttributes.WorkflowIdReusePolicy historyEvent.StartChildWorkflowExecutionInitiatedEventAttributes = attributes return historyEvent diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 8b137e2565b..8263c712f80 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -634,7 +634,8 @@ func (t *transferQueueProcessorImpl) processStartChildExecution(task *persistenc ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(*attributes.ExecutionStartToCloseTimeoutSeconds), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(*attributes.TaskStartToCloseTimeoutSeconds), // Use the same request ID to dedupe StartWorkflowExecution calls - RequestId: common.StringPtr(ci.CreateRequestID), + RequestId: common.StringPtr(ci.CreateRequestID), + WorkflowIdReusePolicy: attributes.WorkflowIdReusePolicy, }, ParentExecutionInfo: &history.ParentExecutionInfo{ DomainUUID: common.StringPtr(domainID),