Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Cantero committed Dec 6, 2016
1 parent 5a12f3f commit f1108e0
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 36 deletions.
6 changes: 1 addition & 5 deletions examples/default_worker.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
class DefaultWorker
include Shoryuken::Worker

shoryuken_options queue: 'test.fifo', auto_delete: true
shoryuken_options queue: 'default', auto_delete: true

def perform(sqs_msg, body)
Shoryuken.logger.info("Received message: '#{body}'")

raise body
end
end

10.times { |i| DefaultWorker.perform_async("#{rand(1000)}-#{i}}") }
1 change: 0 additions & 1 deletion lib/shoryuken/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
require 'shoryuken'
require 'concurrent-edge'


module Shoryuken
# See: https://github.com/mperham/sidekiq/blob/33f5d6b2b6c0dfaab11e5d39688cab7ebadc83ae/lib/sidekiq/cli.rb#L20
class Shutdown < Interrupt; end
Expand Down
1 change: 0 additions & 1 deletion lib/shoryuken/environment_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def initialize_aws
unless Shoryuken.options[:aws].to_h.empty?
Shoryuken.logger.warn { '[DEPRECATION] aws in shoryuken.yml is deprecated. Please use configure_server and configure_client in your initializer' }
end

Shoryuken::AwsConfig.setup(Shoryuken.options[:aws])
end

Expand Down
5 changes: 0 additions & 5 deletions lib/shoryuken/fetcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ class Fetcher < Concurrent::Actor::RestartingContext

FETCH_LIMIT = 10

def on_message(msg)
method, *args = msg
send(method, *args)
end

def initialize(manager)
@manager = manager
end
Expand Down
12 changes: 0 additions & 12 deletions lib/shoryuken/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ module Shoryuken
class Manager < Concurrent::Actor::RestartingContext
include Util

def on_message(msg)
method, *args = msg
send(method, *args)
end

def initialize
@count = Shoryuken.options[:concurrency] || 25

Expand Down Expand Up @@ -140,13 +135,6 @@ def dispatch
end
end


def on_event(event)
if event == :reset
end
logger.info(event.inspect)
end

private

def dispatch_later
Expand Down
16 changes: 9 additions & 7 deletions lib/shoryuken/middleware/server/auto_extend_visibility.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ class AutoExtendVisibility
EXTEND_UPFRONT_SECONDS = 5

def call(worker, queue, sqs_msg, body)
# timer = auto_visibility_timer(worker, queue, sqs_msg, body)
timer = auto_visibility_timer(worker, queue, sqs_msg, body)
begin
yield
ensure
# if timer
# timer.cancel
# @visibility_extender.terminate
# end
if timer
timer.shutdown
@visibility_extender.ask!(:terminate)
end
end
end

Expand All @@ -24,7 +24,9 @@ class MessageVisibilityExtender < Concurrent::Actor::RestartingContext
def auto_extend(worker, queue, sqs_msg, body)
queue_visibility_timeout = Shoryuken::Client.queues(queue).visibility_timeout

every(queue_visibility_timeout - EXTEND_UPFRONT_SECONDS) do
interval = queue_visibility_timeout - EXTEND_UPFRONT_SECONDS

Concurrent::TimerTask.execute(execution_interval: interval, timeout_interval: interval) do
begin
logger.debug do
"Extending message #{worker_name(worker.class, sqs_msg, body)}/#{queue}/#{sqs_msg.message_id} " \
Expand All @@ -45,7 +47,7 @@ def auto_extend(worker, queue, sqs_msg, body)

def auto_visibility_timer(worker, queue, sqs_msg, body)
return unless worker.class.auto_visibility_timeout?
@visibility_extender = MessageVisibilityExtender.new_link
@visibility_extender = MessageVisibilityExtender.spawn! name: :message_visibility_extender, link: true
@visibility_extender.auto_extend(worker, queue, sqs_msg, body)
end
end
Expand Down
5 changes: 0 additions & 5 deletions lib/shoryuken/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ def initialize(manager)
@manager = manager
end

def on_message(msg)
method, *args = msg
send(method, *args)
end

def process(queue, sqs_msg)
worker = Shoryuken.worker_registry.fetch_worker(queue, sqs_msg)

Expand Down
5 changes: 5 additions & 0 deletions lib/shoryuken/util.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
module Shoryuken
module Util
def on_message(msg)
method, *args = msg
send(method, *args)
end

def watchdog(last_words)
yield
rescue => ex
Expand Down

0 comments on commit f1108e0

Please sign in to comment.