-
Notifications
You must be signed in to change notification settings - Fork 911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose history size to workflows #3055
Conversation
@@ -221,9 +233,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduleToStartTimeoutEvent( | |||
return nil, m.ms.createInternalServerError(opTag) | |||
} | |||
|
|||
// clear stickiness whenever workflow task fails | |||
m.ms.ClearStickyness() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't needed here because ReplicateWorkflowTaskTimedOutEvent(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START)
below will always call ClearStickyness
itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could affect incrementTimeout
calculation in FailWorkflowTask
but because timeout type is enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START
, it is false
anyway, so I think it is safe to remove it from here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we only need the values in WorkflowTaskStartedEventAttributes.
Could you explain what is purpose of the values used in ExecutionInfo, and what the purpose of it in WorkflowTaskInfo.
bool workflow_task_suggest_continue_as_new = 67; | ||
int64 workflow_task_history_size_bytes = 68; | ||
|
||
bool cancel_requested = 29; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we keep the field number in order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, I'd much rather see fields grouped by function
service/history/configs/config.go
Outdated
HistorySizeSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeSuggestContinueAsNew, 2*1024*1024), | ||
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 2*1024), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels a bit aggressive. Maybe double it to 4MB and 4K, which still is arbitrary. But gRPC default size limit is also 4MB. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a great feel for it so I'll trust your judgement here. I'm curious what sdk team would say
if stats == nil { | ||
return false, 0 | ||
} | ||
// QUESTION: in some cases we might have history events in memory that we haven't written |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean buffered events? Those are not visible to workflow yet, so I think they should not be counted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly... look at where this is called. Consider the case where we had buffered events, and then we retry the wft. So on line 400, we do AddWorkflowTaskScheduledEvent and then reset Attempt to 1, so we end up here. That wftscheduledevent will be visible to to the workflow, but it won't be counted in HistorySize here, since that only gets updated when the transaction is closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I get it right, stats is updated on write. We get here, when there are some events were added to history but not persisted yet. And stats doesn't reflect them, but m.ms.GetNextEventID()
is current in-memory last event. I think they are not consistent here. But this is probably not a big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not consistent (in the sense I described) would be a big deal. But I think they are consistent but just not accurate, which isn't a big deal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, by "consistent" I mean consistency between "size" and "count". "Accurate" is a better word, yes.
// QUESTION: should we preserve these here? this is used by mutable state rebuilder. it | ||
// seems like the same logic as case 1 above applies: if a failover happens right after | ||
// this, then AddWorkflowTaskStartedEvent will rewrite these anyway. is that correct? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this. cc @yycptt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think the value here doesn't matter as it will get overwritten anyway, either when starting the workflow task (if failover happens) or when replicating started event (no failover).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Let's not set them unit WT is started.
It's an obscure corner case, but I thought in the discussion we decided we needed to handle it: What if you send the worker a wftstartedevent with suggestcontinueasnew == false, and it fails/times out. Then you send a second attempt, which is now a transient wft, with suggestcontinueasnew == false. Then you change dynamic config so that the same history size now makes suggestcontinueasnew == true. Now the worker responds to the wft successfully, and you have to write out the transient events to history. If you re-evaluate suggestcontinueasnew at that point and write a wftstartedevent with it as true, you'll get a determinism error on replay. (Assuming the workflow follows the suggestion.) If we didn't use dynamic config I agree we wouldn't have to keep it in mutable state. |
@@ -1209,32 +1209,6 @@ func (e *MutableStateImpl) DeleteUserTimer( | |||
return nil | |||
} | |||
|
|||
// nolint:unused |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already gone.
host/transient_task_test.go
Outdated
} | ||
|
||
// workflow logic | ||
stage := 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I call it wtHandlerCalls
.
@@ -87,16 +87,21 @@ message WorkflowExecutionInfo { | |||
int64 last_workflow_task_started_event_id = 19; | |||
google.protobuf.Timestamp start_time = 20 [(gogoproto.stdtime) = true]; | |||
google.protobuf.Timestamp last_update_time = 21 [(gogoproto.stdtime) = true]; | |||
|
|||
// This group of fields contains info about the current in-flight workflow task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// This group of fields contains info about the current in-flight workflow task | |
// This group of fields contains info about the current workflow task |
"in-flight" means running in other places. I reordered these already too.
SuggestContinueAsNew: suggestContinueAsNew, | ||
HistorySizeBytes: historySizeBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh.. Attributes are already there for half a year.
// These two fields are sent to workers in the WorkflowTaskStarted event. We need to save a | ||
// copy here to ensure that we send the same values with every transient WorkflowTaskStarted | ||
// event, otherwise a dynamic config change of the suggestion threshold could cause the | ||
// event that the worker used to not match the event we saved in history. | ||
SuggestContinueAsNew bool | ||
HistorySizeBytes int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you trying to make history deterministic? I don't think it is necessary. First of all SDKs can ignore these fields in non-determinism detector same way as they do for activity arguments. But even if they don't, SDK will just replay history from the beginning which is ok for workflows with continuously failing WT. I think SDK already does it (may be not).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But you still need these fields just to pass this data around. All WT related fields from executions.proto
must be here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we discussed: they can change across attempts, but we do need to keep them and can't just recompute them because of determinism. updated comment
// QUESTION: should we preserve these here? this is used by mutable state rebuilder. it | ||
// seems like the same logic as case 1 above applies: if a failover happens right after | ||
// this, then AddWorkflowTaskStartedEvent will rewrite these anyway. is that correct? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Let's not set them unit WT is started.
@@ -221,9 +233,6 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduleToStartTimeoutEvent( | |||
return nil, m.ms.createInternalServerError(opTag) | |||
} | |||
|
|||
// clear stickiness whenever workflow task fails | |||
m.ms.ClearStickyness() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could affect incrementTimeout
calculation in FailWorkflowTask
but because timeout type is enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START
, it is false
anyway, so I think it is safe to remove it from here.
@@ -614,7 +633,10 @@ func (m *workflowTaskStateMachine) DeleteWorkflowTask() { | |||
|
|||
TaskQueue: nil, | |||
// Keep the last original scheduled Timestamp, so that AddWorkflowTaskScheduledEventAsHeartbeat can continue with it. | |||
OriginalScheduledTime: m.getWorkflowTaskInfo().OriginalScheduledTime, | |||
OriginalScheduledTime: m.ms.executionInfo.WorkflowTaskOriginalScheduledTime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please leave getWorkflowTaskInfo()
. It will help me to refactor WT state machine in future.
if stats == nil { | ||
return false, 0 | ||
} | ||
// QUESTION: in some cases we might have history events in memory that we haven't written |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I get it right, stats is updated on write. We get here, when there are some events were added to history but not persisted yet. And stats doesn't reflect them, but m.ms.GetNextEventID()
is current in-memory last event. I think they are not consistent here. But this is probably not a big deal.
workflowTask.SuggestContinueAsNew, workflowTask.HistorySizeBytes = m.getHistorySizeInfo() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to compute it for every attempt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we compute it for every attempt, but only actually write the event to history on the first attempt, then the value used by the workflow may be different from the value written to history, so replay would cause a determinism error. Like 90% of the complexity of this PR is just for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, the answer here is that we write a new started event one when the retried wft completes, so as long as that new started event has the right values, we're good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rebased!
workflowTask.SuggestContinueAsNew, workflowTask.HistorySizeBytes = m.getHistorySizeInfo() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we compute it for every attempt, but only actually write the event to history on the first attempt, then the value used by the workflow may be different from the value written to history, so replay would cause a determinism error. Like 90% of the complexity of this PR is just for that
workflowTask.SuggestContinueAsNew, | ||
workflowTask.HistorySizeBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a little unsure about these. I think probably it should recompute them here? (It seems logical that it should recompute every time hBuilder.AddWorkflowTaskStartedEvent
is called, and not any other time.) But I'm not sure how speculative works...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a failure, so it doesn't matter
if stats == nil { | ||
return false, 0 | ||
} | ||
// QUESTION: in some cases we might have history events in memory that we haven't written |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not consistent (in the sense I described) would be a big deal. But I think they are consistent but just not accurate, which isn't a big deal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplified a little based on discussion
// These two fields are sent to workers in the WorkflowTaskStarted event. We need to save a | ||
// copy here to ensure that we send the same values with every transient WorkflowTaskStarted | ||
// event, otherwise a dynamic config change of the suggestion threshold could cause the | ||
// event that the worker used to not match the event we saved in history. | ||
SuggestContinueAsNew bool | ||
HistorySizeBytes int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as we discussed: they can change across attempts, but we do need to keep them and can't just recompute them because of determinism. updated comment
// events. That's okay, it doesn't have to be 100% accurate. It just has to be kept | ||
// consistent between the started event in history and the event that was sent to the SDK | ||
// that resulted in the successful completion. | ||
suggestContinueAsNew, historySizeBytes := m.getHistorySizeInfo() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed, just compute every time we get here
workflowTask.SuggestContinueAsNew, | ||
workflowTask.HistorySizeBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a failure, so it doesn't matter
// These two fields are sent to workers in the WorkflowTaskStarted event. We need to save a | ||
// copy in mutable state to know the last values we sent (which might have been in a | ||
// transient event), otherwise a dynamic config change of the suggestion threshold could | ||
// cause the WorkflowTaskStarted event that the worker used to not match the event we saved | ||
// in history. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this comment also needs to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what part is wrong? I just updated this one
Feb,9-Jul,5 = 219 |
What changed?
This fills in
HistorySizeBytes
andSuggestContinueAsNew
onWorkflowTaskStartedEventAttributes
, added in temporalio/api#178.Why?
So workflows can decide whether to continue-as-new with less guessing. Fixes #2726 and #1114
How did you test it?
New integration test
Potential risks
Bugs in this logic could lead to inconsistency between the values sent in transient workflow tasks and values actually recorded in history, which could lead to determinism errors on replay.
Is hotfix candidate?
no