Skip to content

Commit

Permalink
Validate workflow task start time when complete
Browse files Browse the repository at this point in the history
  • Loading branch information
yiminc committed Jul 21, 2023
1 parent fdb94a5 commit f791cd6
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 123 deletions.
159 changes: 100 additions & 59 deletions api/token/v1/message.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions common/tasktoken/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ func NewWorkflowTaskToken(
runID string,
scheduledEventID int64,
startedEventId int64,
startTimeUnixNano int64,
attempt int32,
clock *v11.VectorClock,
version int64,
) *tokenspb.Task {
return &tokenspb.Task{
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
ScheduledEventId: scheduledEventID,
StartedEventId: startedEventId,
Attempt: attempt,
Clock: clock,
Version: version,
NamespaceId: namespaceID,
WorkflowId: workflowID,
RunId: runID,
ScheduledEventId: scheduledEventID,
StartedEventId: startedEventId,
StartTimeUnixnano: startTimeUnixNano,
Attempt: attempt,
Clock: clock,
Version: version,
}
}

Expand Down
1 change: 1 addition & 0 deletions proto/internal/temporal/server/api/token/v1/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ message Task {
temporal.server.api.clock.v1.VectorClock clock = 9;
int64 started_event_id = 10;
int64 version = 11;
int64 start_time_unixnano = 12;
}

message QueryTask {
Expand Down
1 change: 1 addition & 0 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ func (wh *WorkflowHandler) RespondWorkflowTaskCompleted(
taskToken.GetRunId(),
histResp.StartedResponse.GetScheduledEventId(),
histResp.StartedResponse.GetStartedEventId(),
histResp.StartedResponse.GetStartedTime().UnixNano(),
histResp.StartedResponse.GetAttempt(),
histResp.StartedResponse.GetClock(),
histResp.StartedResponse.GetVersion(),
Expand Down
3 changes: 2 additions & 1 deletion service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/api/historyservice/v1"

"go.temporal.io/server/common/tasktoken"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
Expand Down Expand Up @@ -543,6 +543,7 @@ func (s *Starter) generateResponse(
runID,
workflowTaskInfo.ScheduledEventID,
workflowTaskInfo.StartedEventID,
workflowTaskInfo.StartedTime.UnixNano(),
workflowTaskInfo.Attempt,
clock,
workflowTaskInfo.Version,
Expand Down
32 changes: 19 additions & 13 deletions service/history/workflow_task_handler_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskFailed(
if workflowTask == nil ||
workflowTask.StartedEventID == common.EmptyEventID ||
(token.StartedEventId != common.EmptyEventID && token.StartedEventId != workflowTask.StartedEventID) ||
(token.StartTimeUnixnano != 0 && token.StartTimeUnixnano != workflowTask.StartedTime.UnixNano()) ||
workflowTask.Attempt != token.Attempt ||
(workflowTask.Version != common.EmptyVersion && token.Version != workflowTask.Version) {
// we have not alter mutable state yet, so release with it with nil to avoid clear MS.
workflowContext.GetReleaseFn()(nil)
return nil, serviceerror.NewNotFound("Workflow task not found.")
}

Expand Down Expand Up @@ -393,6 +396,22 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
if err != nil {
return nil, err
}
weContext := workflowContext.GetContext()
ms := workflowContext.GetMutableState()

currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId())
if !ms.IsWorkflowExecutionRunning() ||
currentWorkflowTask == nil ||
currentWorkflowTask.StartedEventID == common.EmptyEventID ||
(token.StartedEventId != common.EmptyEventID && token.StartedEventId != currentWorkflowTask.StartedEventID) ||
(token.StartTimeUnixnano != 0 && token.StartTimeUnixnano != currentWorkflowTask.StartedTime.UnixNano()) ||
currentWorkflowTask.Attempt != token.Attempt ||
(token.Version != common.EmptyVersion && token.Version != currentWorkflowTask.Version) {
// we have not alter mutable state yet, so release with it with nil to avoid clear MS.
workflowContext.GetReleaseFn()(nil)
return nil, serviceerror.NewNotFound("Workflow task not found.")
}

defer func() { workflowContext.GetReleaseFn()(retError) }()

var effects effect.Buffer
Expand All @@ -410,19 +429,6 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
effects.Apply(ctx)
}()

weContext := workflowContext.GetContext()
ms := workflowContext.GetMutableState()

currentWorkflowTask := ms.GetWorkflowTaskByID(token.GetScheduledEventId())
if !ms.IsWorkflowExecutionRunning() ||
currentWorkflowTask == nil ||
currentWorkflowTask.StartedEventID == common.EmptyEventID ||
(token.StartedEventId != common.EmptyEventID && token.StartedEventId != currentWorkflowTask.StartedEventID) ||
currentWorkflowTask.Attempt != token.Attempt ||
(token.Version != common.EmptyVersion && token.Version != currentWorkflowTask.Version) {
return nil, serviceerror.NewNotFound("Workflow task not found.")
}

// It's an error if the workflow has used versioning in the past but this task has no versioning info.
if ms.GetWorkerVersionStamp().GetUseVersioning() && !request.GetWorkerVersionStamp().GetUseVersioning() {
return nil, serviceerror.NewInvalidArgument("Workflow using versioning must continue to use versioning.")
Expand Down
Loading

0 comments on commit f791cd6

Please sign in to comment.