Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

History size and suggest continue as new #269

Merged
merged 2 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ worker.register_workflow(CancellingTimerWorkflow)
worker.register_workflow(CheckWorkflow)
worker.register_workflow(ChildWorkflowTimeoutWorkflow)
worker.register_workflow(ChildWorkflowTerminatedWorkflow)
worker.register_workflow(ContinueAsNewWorkflow)
worker.register_workflow(FailingActivitiesWorkflow)
worker.register_workflow(FailingWorkflow)
worker.register_workflow(HandlingStructuredErrorWorkflow)
Expand Down
43 changes: 43 additions & 0 deletions examples/spec/integration/continue_as_new_spec.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'workflows/continue_as_new_workflow'
require 'workflows/loop_workflow'

describe LoopWorkflow do
Expand Down Expand Up @@ -47,4 +48,46 @@
expect(final_result[:memo]).to eq(memo)
expect(final_result[:headers]).to eq(headers)
end

it 'uses history bytes size to continue as new' do
workflow_id = SecureRandom.uuid
# 7 activity invocations produce about 10,000 bytes of history. This should
# result in one continue as new with 7 activities in the first and 3 in the
# second run.
run_id = Temporal.start_workflow(
ContinueAsNewWorkflow,
10, # hello count
10_000, # max bytes limit
options: {
workflow_id: workflow_id,
timeouts: {
execution: 60,
run: 20
}
},
)

# First run will throw because it continued as new
next_run_id = nil
expect do
Temporal.await_workflow_result(
ContinueAsNewWorkflow,
workflow_id: workflow_id,
run_id: run_id,
)
end.to raise_error(Temporal::WorkflowRunContinuedAsNew) do |error|
next_run_id = error.new_run_id
end

expect(next_run_id).to_not eq(nil)

# Second run will not throw because it returns rather than continues as new.
final_result = Temporal.await_workflow_result(
ContinueAsNewWorkflow,
workflow_id: workflow_id,
run_id: next_run_id,
)

expect(final_result[:runs]).to eq(2)
end
end
19 changes: 19 additions & 0 deletions examples/workflows/continue_as_new_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
require 'activities/hello_world_activity'

# Demonstrates how to use history_size to determine when to continue as new
class ContinueAsNewWorkflow < Temporal::Workflow
def execute(hello_count, bytes_max, run = 1)
while hello_count.positive? && workflow.history_size.bytes < bytes_max
HelloWorldActivity.execute!("Alice Long#{'long' * 100}name")
hello_count -= 1
end

workflow.logger.info("Workflow history size: #{workflow.history_size}, remaining hellos: #{hello_count}")

return workflow.continue_as_new(hello_count, bytes_max, run + 1) if hello_count.positive?

{
runs: run
}
end
end
6 changes: 6 additions & 0 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def has_release?(release_name)
state_manager.release?(release_name.to_s)
end

# Returns information about the workflow run's history up to this point. This can be used to
# determine when to continue as new.
def history_size
state_manager.history_size
end

def execute_activity(activity_class, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
Expand Down
11 changes: 11 additions & 0 deletions lib/temporal/workflow/history/size.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module Temporal
class Workflow
class History
Size = Struct.new(
:bytes, # integer, total number of bytes used
:events, # integer, total number of history events used
:suggest_continue_as_new, # boolean, true if server history length limits are being approached
keyword_init: true)
end
end
end
6 changes: 5 additions & 1 deletion lib/temporal/workflow/history/window.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ module Temporal
class Workflow
class History
class Window
attr_reader :local_time, :last_event_id, :events, :sdk_flags
attr_reader :local_time, :last_event_id, :events, :sdk_flags, :history_size_bytes, :suggest_continue_as_new

def initialize
@local_time = nil
@last_event_id = nil
@events = []
@replay = false
@sdk_flags = Set.new
@history_size_bytes = 0
@suggest_continue_as_new = false
end

def replay?
Expand All @@ -24,6 +26,8 @@ def add(event)
when 'WORKFLOW_TASK_STARTED'
@last_event_id = event.id + 1 # one for completed
@local_time = event.timestamp
@history_size_bytes = event.attributes.history_size_bytes
@suggest_continue_as_new = event.attributes.suggest_continue_as_new
when 'WORKFLOW_TASK_FAILED', 'WORKFLOW_TASK_TIMED_OUT'
@last_event_id = nil
@local_time = nil
Expand Down
11 changes: 11 additions & 0 deletions lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'temporal/workflow/command'
require 'temporal/workflow/command_state_machine'
require 'temporal/workflow/history/event_target'
require 'temporal/workflow/history/size'
require 'temporal/concerns/payloads'
require 'temporal/workflow/errors'
require 'temporal/workflow/sdk_flags'
Expand Down Expand Up @@ -90,6 +91,8 @@ def apply(history_window)
@local_time = history_window.local_time
@last_event_id = history_window.last_event_id
history_window.sdk_flags.each { |flag| sdk_flags.add(flag) }
@history_size_bytes = history_window.history_size_bytes
@suggest_continue_as_new = history_window.suggest_continue_as_new

order_events(history_window.events).each do |event|
apply_event(event)
Expand All @@ -116,6 +119,14 @@ def self.signal_event?(event)
event.type == 'WORKFLOW_EXECUTION_SIGNALED'
end

def history_size
History::Size.new(
events: @last_event_id,
bytes: @history_size_bytes,
suggest_continue_as_new: @suggest_continue_as_new
).freeze
end

private

attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :sdk_flags
Expand Down
5 changes: 4 additions & 1 deletion spec/fabricators/grpc/history_event_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ class TestSerializer
end

Fabricator(:api_workflow_task_started_event, from: :api_history_event) do
transient :history_size_bytes, :suggest_continue_as_new
event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_STARTED }
workflow_task_started_event_attributes do |attrs|
Temporalio::Api::History::V1::WorkflowTaskStartedEventAttributes.new(
scheduled_event_id: attrs[:event_id] - 1,
identity: 'test-worker@test-host',
request_id: SecureRandom.uuid
request_id: SecureRandom.uuid,
history_size_bytes: attrs[:history_size_bytes],
suggest_continue_as_new: attrs[:suggest_continue_as_new]
)
end
end
Expand Down
34 changes: 34 additions & 0 deletions spec/unit/lib/temporal/workflow/state_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,40 @@ def test_order(signal_first)
end
end

describe '#history_size' do
let(:config) { Temporal::Configuration.new }
let(:history_size_bytes) { 27 }
let(:suggest_continue_as_new) { true }
let(:start_workflow_execution_event) { Fabricate(:api_workflow_execution_started_event) }
let(:workflow_task_scheduled_event) { Fabricate(:api_workflow_task_scheduled_event, event_id: 2) }
let(:workflow_task_started_event) do
Fabricate(
:api_workflow_task_started_event,
event_id: 3,
history_size_bytes: history_size_bytes,
suggest_continue_as_new: suggest_continue_as_new)
end

it 'has correct event count' do
state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, config)

window = Temporal::Workflow::History::Window.new
window.add(Temporal::Workflow::History::Event.new(start_workflow_execution_event))
window.add(Temporal::Workflow::History::Event.new(workflow_task_scheduled_event))
window.add(Temporal::Workflow::History::Event.new(workflow_task_started_event))

state_manager.apply(window)

expect(state_manager.history_size).to eq(
Temporal::Workflow::History::Size.new(
events: 4, # comes from event id of started + 1
bytes: history_size_bytes,
suggest_continue_as_new: suggest_continue_as_new
)
)
end
end

describe '#search_attributes' do
let(:initial_search_attributes) do
{
Expand Down