Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement Timeout.timeout with a single thread and a Queue #15

Merged
merged 5 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
name: ubuntu
name: test

on: [push, pull_request]

jobs:
build:
name: build (${{ matrix.ruby }} / ${{ matrix.os }})
strategy:
fail-fast: false
matrix:
ruby: [ '3.0', 2.7, 2.6, 2.5, 2.4, head ]
ruby: [ '3.0', 2.7, 2.6, 2.5, 2.4, head, jruby, truffleruby-head ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

os: [ ubuntu-latest, macos-latest ]
runs-on: ${{ matrix.os }}
steps:
Expand Down
134 changes: 97 additions & 37 deletions lib/timeout.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# frozen_string_literal: false
# frozen_string_literal: true
# Timeout long-running blocks
#
# == Synopsis
Expand All @@ -23,7 +23,7 @@
# Copyright:: (C) 2000 Information-technology Promotion Agency, Japan

module Timeout
VERSION = "0.2.0".freeze
VERSION = "0.2.0"

# Raised by Timeout.timeout when the block times out.
class Error < RuntimeError
Expand All @@ -50,9 +50,88 @@ def exception(*)
end

# :stopdoc:
THIS_FILE = /\A#{Regexp.quote(__FILE__)}:/o
CALLER_OFFSET = ((c = caller[0]) && THIS_FILE =~ c) ? 1 : 0
private_constant :THIS_FILE, :CALLER_OFFSET
CONDVAR = ConditionVariable.new
QUEUE = Queue.new
QUEUE_MUTEX = Mutex.new
TIMEOUT_THREAD_MUTEX = Mutex.new
@timeout_thread = nil
private_constant :CONDVAR, :QUEUE, :QUEUE_MUTEX, :TIMEOUT_THREAD_MUTEX

class Request
attr_reader :deadline

def initialize(thread, timeout, exception_class, message)
@thread = thread
@deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
@exception_class = exception_class
@message = message

@mutex = Mutex.new
@done = false # protected by @mutex
end

def done?
@mutex.synchronize do
@done
end
end

def expired?(now)
now >= @deadline
end

def interrupt
@mutex.synchronize do
unless @done
@thread.raise @exception_class, @message
@done = true
end
end
end

def finished
@mutex.synchronize do
@done = true
end
end
end
private_constant :Request

def self.create_timeout_thread
Thread.new do
requests = []
while true
until QUEUE.empty? and !requests.empty? # wait to have at least one request
req = QUEUE.pop
requests << req unless req.done?
end
closest_deadline = requests.min_by(&:deadline).deadline

now = 0.0
QUEUE_MUTEX.synchronize do
while (now = Process.clock_gettime(Process::CLOCK_MONOTONIC)) < closest_deadline and QUEUE.empty?
CONDVAR.wait(QUEUE_MUTEX, closest_deadline - now)
end
end

requests.each do |req|
req.interrupt if req.expired?(now)
end
requests.reject!(&:done?)
end
end
end
private_class_method :create_timeout_thread

def self.ensure_timeout_thread_created
unless @timeout_thread and @timeout_thread.alive?
TIMEOUT_THREAD_MUTEX.synchronize do
unless @timeout_thread and @timeout_thread.alive?
@timeout_thread = create_timeout_thread
end
end
end
end
# :startdoc:

# Perform an operation in a block, raising an error if it takes longer than
Expand Down Expand Up @@ -83,51 +162,32 @@ def exception(*)
def timeout(sec, klass = nil, message = nil, &block) #:yield: +sec+
return yield(sec) if sec == nil or sec.zero?

message ||= "execution expired".freeze
message ||= "execution expired"

if Fiber.respond_to?(:current_scheduler) && (scheduler = Fiber.current_scheduler)&.respond_to?(:timeout_after)
return scheduler.timeout_after(sec, klass || Error, message, &block)
end

from = "from #{caller_locations(1, 1)[0]}" if $DEBUG
e = Error
bl = proc do |exception|
Timeout.ensure_timeout_thread_created
perform = Proc.new do |exc|
request = Request.new(Thread.current, sec, exc, message)
QUEUE_MUTEX.synchronize do
QUEUE << request
CONDVAR.signal
end
begin
x = Thread.current
y = Thread.start {
Thread.current.name = from
begin
sleep sec
rescue => e
x.raise e
else
x.raise exception, message
end
}
return yield(sec)
ensure
if y
y.kill
y.join # make sure y is dead.
end
request.finished
end
end

if klass
begin
bl.call(klass)
rescue klass => e
message = e.message
bt = e.backtrace
end
perform.call(klass)
else
bt = Error.catch(message, &bl)
backtrace = Error.catch(&perform)
raise Error, message, backtrace
end
level = -caller(CALLER_OFFSET).size-2
while THIS_FILE =~ bt[level]
bt.delete_at(level)
end
raise(e, message, bt)
end

module_function :timeout
end
33 changes: 33 additions & 0 deletions test/test_timeout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ def test_non_timing_out_code_is_successful
end
end

def test_included
c = Class.new do
include Timeout
def test
timeout(1) { :ok }
end
end
assert_nothing_raised do
assert_equal :ok, c.new.test
end
end

def test_yield_param
assert_equal [5, :ok], Timeout.timeout(5){|s| [s, :ok] }
end
Expand Down Expand Up @@ -43,6 +55,7 @@ def test_skip_rescue
begin
sleep 3
rescue Exception => e
flunk "should not see any exception but saw #{e.inspect}"
end
end
end
Expand Down Expand Up @@ -126,4 +139,24 @@ def test_handle_interrupt
}
assert(ok, bug11344)
end

def test_fork
omit 'fork not supported' unless Process.respond_to?(:fork)
r, w = IO.pipe
pid = fork do
r.close
begin
r = Timeout.timeout(0.01) { sleep 5 }
w.write r.inspect
rescue Timeout::Error
w.write 'timeout'
ensure
w.close
end
end
w.close
Process.wait pid
assert_equal 'timeout', r.read
r.close
end
end