diff --git a/lib/concurrent/timer_task.rb b/lib/concurrent/timer_task.rb index 8e66427c0..0f1cd1ecf 100644 --- a/lib/concurrent/timer_task.rb +++ b/lib/concurrent/timer_task.rb @@ -280,7 +280,8 @@ def ns_initialize(opts, &task) self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL @run_now = opts[:now] || opts[:run_now] - @executor = Concurrent::SafeTaskExecutor.new(task) + @task = task + @executor = Concurrent::RubySingleThreadExecutor.new() @running = Concurrent::AtomicBoolean.new(false) @value = nil @@ -309,13 +310,26 @@ def schedule_next_task(interval = execution_interval) def execute_task(completion) return nil unless @running.true? ScheduledTask.execute(timeout_interval, args: [completion], &method(:timeout_task)) - _success, value, reason = @executor.execute(self) + @thread_completed = Concurrent::Event.new + @value = @reason = nil + + @executor.post do + begin + @value = @task.call(self) + rescue Exception => ex + @reason = ex + ensure + @thread_completed.set + end + end + + @thread_completed.wait + if completion.try? - self.value = value schedule_next_task time = Time.now observers.notify_observers do - [time, self.value, reason] + [time, self.value, @reason] end end nil @@ -325,6 +339,10 @@ def execute_task(completion) def timeout_task(completion) return unless @running.true? if completion.try? + @executor.kill + @executor.wait_for_termination + @executor = Concurrent::RubySingleThreadExecutor.new() + @thread_completed.set self.value = value schedule_next_task observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new) diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 6ed1cb597..7d1db309e 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -4,6 +4,21 @@ module Concurrent RSpec.describe TimerTask do + let(:observer) do + Class.new do + attr_reader :time + attr_reader :value + attr_reader :ex + attr_accessor :latch + define_method(:initialize) { @latch = CountDownLatch.new(1) } + define_method(:update) do |time, value, ex| + @time = time + @value = value + @ex = ex + @latch.count_down + end + end.new + end context :dereferenceable do @@ -212,26 +227,37 @@ def trigger_observable(observable) expect(expected).to eq subject subject.kill end - end - context 'observation' do + context "timeout" do + it 'should not timeout' do + subject = TimerTask.new(execution: 0.1, timeout: 1) { sleep(0.5); 42 } + subject.add_observer(observer) + subject.execute + observer.latch.wait(2) + expect(observer.ex).to be_nil + expect(observer.value).to eq(42) + subject.kill + end - let(:observer) do - Class.new do - attr_reader :time - attr_reader :value - attr_reader :ex - attr_reader :latch - define_method(:initialize) { @latch = CountDownLatch.new(1) } - define_method(:update) do |time, value, ex| - @time = time - @value = value - @ex = ex - @latch.count_down + it 'times out and kills the current task' do + observer.latch = CountDownLatch.new(2) + subject = TimerTask.new(execution: 0.1, timeout: 0.5, run_now: true) do + sleep(5) if observer.latch.count == 2 + 42 end - end.new + subject.add_observer(observer) + + subject.execute + observer.latch.wait(2) + + expect(observer.ex).to be_nil + expect(observer.value).to eq(42) + subject.kill + end end + end + context 'observation' do it 'notifies all observers on success' do subject = TimerTask.new(execution: 0.1) { 42 } subject.add_observer(observer)