Skip to content

Commit

Permalink
History size and suggest continue as new (#269)
Browse files Browse the repository at this point in the history
* Track history size

* Add example integration test for continuing as new
  • Loading branch information
jeffschoner authored Oct 19, 2023
1 parent 0540942 commit 263e975
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 2 deletions.
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ worker.register_workflow(CancellingTimerWorkflow)
worker.register_workflow(CheckWorkflow)
worker.register_workflow(ChildWorkflowTimeoutWorkflow)
worker.register_workflow(ChildWorkflowTerminatedWorkflow)
worker.register_workflow(ContinueAsNewWorkflow)
worker.register_workflow(FailingActivitiesWorkflow)
worker.register_workflow(FailingWorkflow)
worker.register_workflow(HandlingStructuredErrorWorkflow)
Expand Down
43 changes: 43 additions & 0 deletions examples/spec/integration/continue_as_new_spec.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'workflows/continue_as_new_workflow'
require 'workflows/loop_workflow'

describe LoopWorkflow do
Expand Down Expand Up @@ -47,4 +48,46 @@
expect(final_result[:memo]).to eq(memo)
expect(final_result[:headers]).to eq(headers)
end

it 'uses history bytes size to continue as new' do
workflow_id = SecureRandom.uuid
# 7 activity invocations produce about 10,000 bytes of history. This should
# result in one continue as new with 7 activities in the first and 3 in the
# second run.
run_id = Temporal.start_workflow(
ContinueAsNewWorkflow,
10, # hello count
10_000, # max bytes limit
options: {
workflow_id: workflow_id,
timeouts: {
execution: 60,
run: 20
}
},
)

# First run will throw because it continued as new
next_run_id = nil
expect do
Temporal.await_workflow_result(
ContinueAsNewWorkflow,
workflow_id: workflow_id,
run_id: run_id,
)
end.to raise_error(Temporal::WorkflowRunContinuedAsNew) do |error|
next_run_id = error.new_run_id
end

expect(next_run_id).to_not eq(nil)

# Second run will not throw because it returns rather than continues as new.
final_result = Temporal.await_workflow_result(
ContinueAsNewWorkflow,
workflow_id: workflow_id,
run_id: next_run_id,
)

expect(final_result[:runs]).to eq(2)
end
end
19 changes: 19 additions & 0 deletions examples/workflows/continue_as_new_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
require 'activities/hello_world_activity'

# Demonstrates how to use history_size to determine when to continue as new
class ContinueAsNewWorkflow < Temporal::Workflow
def execute(hello_count, bytes_max, run = 1)
while hello_count.positive? && workflow.history_size.bytes < bytes_max
HelloWorldActivity.execute!("Alice Long#{'long' * 100}name")
hello_count -= 1
end

workflow.logger.info("Workflow history size: #{workflow.history_size}, remaining hellos: #{hello_count}")

return workflow.continue_as_new(hello_count, bytes_max, run + 1) if hello_count.positive?

{
runs: run
}
end
end
6 changes: 6 additions & 0 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ def has_release?(release_name)
state_manager.release?(release_name.to_s)
end

# Returns information about the workflow run's history up to this point. This can be used to
# determine when to continue as new.
def history_size
state_manager.history_size
end

def execute_activity(activity_class, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
Expand Down
11 changes: 11 additions & 0 deletions lib/temporal/workflow/history/size.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module Temporal
class Workflow
class History
Size = Struct.new(
:bytes, # integer, total number of bytes used
:events, # integer, total number of history events used
:suggest_continue_as_new, # boolean, true if server history length limits are being approached
keyword_init: true)
end
end
end
6 changes: 5 additions & 1 deletion lib/temporal/workflow/history/window.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ module Temporal
class Workflow
class History
class Window
attr_reader :local_time, :last_event_id, :events, :sdk_flags
attr_reader :local_time, :last_event_id, :events, :sdk_flags, :history_size_bytes, :suggest_continue_as_new

def initialize
@local_time = nil
@last_event_id = nil
@events = []
@replay = false
@sdk_flags = Set.new
@history_size_bytes = 0
@suggest_continue_as_new = false
end

def replay?
Expand All @@ -24,6 +26,8 @@ def add(event)
when 'WORKFLOW_TASK_STARTED'
@last_event_id = event.id + 1 # one for completed
@local_time = event.timestamp
@history_size_bytes = event.attributes.history_size_bytes
@suggest_continue_as_new = event.attributes.suggest_continue_as_new
when 'WORKFLOW_TASK_FAILED', 'WORKFLOW_TASK_TIMED_OUT'
@last_event_id = nil
@local_time = nil
Expand Down
11 changes: 11 additions & 0 deletions lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'temporal/workflow/command'
require 'temporal/workflow/command_state_machine'
require 'temporal/workflow/history/event_target'
require 'temporal/workflow/history/size'
require 'temporal/concerns/payloads'
require 'temporal/workflow/errors'
require 'temporal/workflow/sdk_flags'
Expand Down Expand Up @@ -90,6 +91,8 @@ def apply(history_window)
@local_time = history_window.local_time
@last_event_id = history_window.last_event_id
history_window.sdk_flags.each { |flag| sdk_flags.add(flag) }
@history_size_bytes = history_window.history_size_bytes
@suggest_continue_as_new = history_window.suggest_continue_as_new

order_events(history_window.events).each do |event|
apply_event(event)
Expand All @@ -116,6 +119,14 @@ def self.signal_event?(event)
event.type == 'WORKFLOW_EXECUTION_SIGNALED'
end

def history_size
History::Size.new(
events: @last_event_id,
bytes: @history_size_bytes,
suggest_continue_as_new: @suggest_continue_as_new
).freeze
end

private

attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :sdk_flags
Expand Down
5 changes: 4 additions & 1 deletion spec/fabricators/grpc/history_event_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,15 @@ class TestSerializer
end

Fabricator(:api_workflow_task_started_event, from: :api_history_event) do
transient :history_size_bytes, :suggest_continue_as_new
event_type { Temporalio::Api::Enums::V1::EventType::EVENT_TYPE_WORKFLOW_TASK_STARTED }
workflow_task_started_event_attributes do |attrs|
Temporalio::Api::History::V1::WorkflowTaskStartedEventAttributes.new(
scheduled_event_id: attrs[:event_id] - 1,
identity: 'test-worker@test-host',
request_id: SecureRandom.uuid
request_id: SecureRandom.uuid,
history_size_bytes: attrs[:history_size_bytes],
suggest_continue_as_new: attrs[:suggest_continue_as_new]
)
end
end
Expand Down
34 changes: 34 additions & 0 deletions spec/unit/lib/temporal/workflow/state_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,40 @@ def test_order(signal_first)
end
end

describe '#history_size' do
let(:config) { Temporal::Configuration.new }
let(:history_size_bytes) { 27 }
let(:suggest_continue_as_new) { true }
let(:start_workflow_execution_event) { Fabricate(:api_workflow_execution_started_event) }
let(:workflow_task_scheduled_event) { Fabricate(:api_workflow_task_scheduled_event, event_id: 2) }
let(:workflow_task_started_event) do
Fabricate(
:api_workflow_task_started_event,
event_id: 3,
history_size_bytes: history_size_bytes,
suggest_continue_as_new: suggest_continue_as_new)
end

it 'has correct event count' do
state_manager = described_class.new(Temporal::Workflow::Dispatcher.new, config)

window = Temporal::Workflow::History::Window.new
window.add(Temporal::Workflow::History::Event.new(start_workflow_execution_event))
window.add(Temporal::Workflow::History::Event.new(workflow_task_scheduled_event))
window.add(Temporal::Workflow::History::Event.new(workflow_task_started_event))

state_manager.apply(window)

expect(state_manager.history_size).to eq(
Temporal::Workflow::History::Size.new(
events: 4, # comes from event id of started + 1
bytes: history_size_bytes,
suggest_continue_as_new: suggest_continue_as_new
)
)
end
end

describe '#search_attributes' do
let(:initial_search_attributes) do
{
Expand Down

0 comments on commit 263e975

Please sign in to comment.