From 05b467b6dccaea130d8d9629d6dd65e93a618e1b Mon Sep 17 00:00:00 2001 From: Ash Tyndall Date: Thu, 15 Dec 2016 07:20:54 +0800 Subject: [PATCH 01/14] Add StrictPriority strategy --- lib/shoryuken/polling.rb | 146 +++++++++++++++++++++++++++++---- spec/shoryuken/polling_spec.rb | 139 +++++++++++++++++++++++++++++++ 2 files changed, 267 insertions(+), 18 deletions(-) diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index f6d6d24e..24cbaf37 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -19,11 +19,49 @@ def ==(other) end alias_method :eql?, :== + + def to_s + options.empty? ? name : super + end end - class WeightedRoundRobin + class BaseStrategy include Util + def next_queue + fail NotImplementedError + end + + def messages_found(queue, messages_found) + fail NotImplementedError + end + + def active_queues + fail NotImplementedError + end + + def ==(other) + case other + when Array + @queues == other + else + if other.respond_to?(:active_queues) + active_queues == other.active_queues + else + false + end + end + end + + private + + def delay + Shoryuken.options[:delay].to_f + end + end + + class WeightedRoundRobin < BaseStrategy + def initialize(queues) @initial_queues = queues @queues = queues.dup.uniq @@ -57,25 +95,8 @@ def active_queues unparse_queues(@queues) end - def ==(other) - case other - when Array - @queues == other - else - if other.respond_to?(:active_queues) - active_queues == other.active_queues - else - false - end - end - end - private - def delay - Shoryuken.options[:delay].to_f - end - def pause(queue) return unless @queues.delete(queue) @paused_queues << [Time.now + delay, queue] @@ -102,5 +123,94 @@ def queue_weight(queues, queue) queues.count { |q| q == queue } end end + + class StrictPriority < BaseStrategy + + def initialize(queues) + # Mapping of queues to priority values + @queue_priorities = queues + .each_with_object(Hash.new(0)) { |queue, h| h[queue] += 1 } + + # Priority ordering of the queues + @queue_order = @queue_priorities + .to_a + .sort_by { |queue, priority| -priority } + .map(&:first) + + # Pause status of the queues + @queue_status = @queue_order + .each_with_object({}) { |queue, h| h[queue] = [true, nil] } + + # Most recently used queue + @current_queue = nil + end + + def next_queue + unpause_queues + @current_queue = next_active_queue + return nil if @current_queue.nil? + QueueConfiguration.new(@current_queue, {}) + end + + def messages_found(queue, messages_found) + if messages_found == 0 + # If no messages are found, we pause a given queue + pause(queue) + else + # Reset the current queue when messages found to cause priorities to re-run + @current_queue = nil + end + end + + def active_queues + @queue_status + .select { |_, status| status.first } + .map { |queue, _| [queue, @queue_priorities[queue]] } + end + + private + + def next_active_queue + return nil unless @queue_order.length > 0 + + start = @current_queue.nil? ? 0 : @queue_order.index(@current_queue) + 1 + i = 0 + + # Loop through the queue order from the current queue until we find a + # queue that is next in line and is not paused + while true + queue = @queue_order[(start + i) % @queue_order.length] + active, delay = @queue_status[queue] + + i += 1 + return queue if active + return nil if i >= @queue_order.length # Prevents infinite looping + end + end + + def pause(queue) + return unless delay > 0 + @queue_status[queue] = [false, Time.now + delay] + logger.debug "Paused '#{queue}'" + end + + def unpause_queues + # Modifies queue statuses for queues that are now unpaused + @queue_status = @queue_status.each_with_object({}) do |e, h| + queue, status = e + active, delay = status + + h[queue] = if active + [true, nil] + elsif Time.now > delay + logger.debug "Unpaused '#{queue}'" + @current_queue = nil # Reset the check ordering on un-pause + [true, nil] + else + [false, delay] + end + end + end + end end end diff --git a/spec/shoryuken/polling_spec.rb b/spec/shoryuken/polling_spec.rb index 5a50b3bb..6293037a 100644 --- a/spec/shoryuken/polling_spec.rb +++ b/spec/shoryuken/polling_spec.rb @@ -98,3 +98,142 @@ end end end + +describe Shoryuken::Polling::StrictPriority do + let(:queue1) { 'shoryuken' } + let(:queue2) { 'uppercut' } + let(:queue3) { 'other' } + let(:queues) { Array.new } + subject { Shoryuken::Polling::StrictPriority.new(queues) } + + describe '#next_queue' do + it 'cycles when declared desc' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + end + + it 'cycles when declared asc' do + # [uppercut, 1] + # [shoryuken, 2] + queues << queue2 + queues << queue1 + queues << queue1 + + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + end + + it 'returns nil if there are no active queues' do + expect(subject.next_queue).to eq(nil) + end + + it 'unpauses queues whose pause is expired' do + # [shoryuken, 3] + # [uppercut, 2] + # [other, 1] + queues << queue1 + queues << queue1 + queues << queue1 + queues << queue2 + queues << queue2 + queues << queue3 + + allow(subject).to receive(:delay).and_return(10) + + now = Time.now + allow(Time).to receive(:now).and_return(now) + + # pause the second queue, see it loop between 1 and 3 + subject.messages_found(queue2, 0) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue3) + expect(subject.next_queue).to eq(queue1) + + now += 5 + allow(Time).to receive(:now).and_return(now) + + # pause the first queue, see it repeat 3 + subject.messages_found(queue1, 0) + expect(subject.next_queue).to eq(queue3) + expect(subject.next_queue).to eq(queue3) + + # pause the third queue, see it have nothing + subject.messages_found(queue3, 0) + expect(subject.next_queue).to eq(nil) + + # unpause queue 2 + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue2) + + # unpause queues 1 and 3 + now += 6 + allow(Time).to receive(:now).and_return(now) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue3) + end + end + + describe '#messages_found' do + it 'pauses a queue if there are no messages found' do + # [shoryuken, 2] + # [uppercut, 1] + queues << queue1 + queues << queue1 + queues << queue2 + + expect(subject.active_queues).to eq([[queue1, 2], [queue2, 1]]) + expect(subject).to receive(:pause).with(queue1).and_call_original + subject.messages_found(queue1, 0) + expect(subject.active_queues).to eq([[queue2, 1]]) + end + + it 'continues to queue the highest priority queue if messages are found' do + # [shoryuken, 3] + # [uppercut, 2] + # [other, 1] + queues << queue1 + queues << queue1 + queues << queue1 + queues << queue2 + queues << queue2 + queues << queue3 + + expect(subject.next_queue).to eq(queue1) + subject.messages_found(queue1, 1) + expect(subject.next_queue).to eq(queue1) + subject.messages_found(queue1, 1) + expect(subject.next_queue).to eq(queue1) + end + + it 'resets the priorities if messages are found part way' do + # [shoryuken, 3] + # [uppercut, 2] + # [other, 1] + queues << queue1 + queues << queue1 + queues << queue1 + queues << queue2 + queues << queue2 + queues << queue3 + + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + subject.messages_found(queue2, 1) + expect(subject.next_queue).to eq(queue1) + expect(subject.next_queue).to eq(queue2) + expect(subject.next_queue).to eq(queue3) + end + end +end From 160bba7382a9d101941f6f49ddc7567109a14817 Mon Sep 17 00:00:00 2001 From: Ash Tyndall Date: Fri, 16 Dec 2016 09:33:39 +0800 Subject: [PATCH 02/14] Restructure strategy based on feedback to be more clear and concise --- lib/shoryuken/polling.rb | 94 ++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 56 deletions(-) diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index 24cbaf37..461abbef 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -127,90 +127,72 @@ def queue_weight(queues, queue) class StrictPriority < BaseStrategy def initialize(queues) - # Mapping of queues to priority values - @queue_priorities = queues - .each_with_object(Hash.new(0)) { |queue, h| h[queue] += 1 } - - # Priority ordering of the queues - @queue_order = @queue_priorities - .to_a - .sort_by { |queue, priority| -priority } + # Priority ordering of the queues, highest priority first + @initial_order = queues + .group_by { |q| q } + .sort_by { |q, qs| -qs.count } .map(&:first) - # Pause status of the queues - @queue_status = @queue_order - .each_with_object({}) { |queue, h| h[queue] = [true, nil] } + # Stores the queue ordering with the next queue as first element + @queue_order = @initial_order.dup - # Most recently used queue - @current_queue = nil + # Pause status of the queues, default to past time (unpaused) + @paused_until = queues + .each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) } end def next_queue - unpause_queues - @current_queue = next_active_queue - return nil if @current_queue.nil? - QueueConfiguration.new(@current_queue, {}) + next_queue = next_active_queue + next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {}) end def messages_found(queue, messages_found) if messages_found == 0 # If no messages are found, we pause a given queue - pause(queue) + pause(queue) else - # Reset the current queue when messages found to cause priorities to re-run - @current_queue = nil + # Reset the queue order to the initial ordering + @queue_order = @initial_order.dup end end def active_queues - @queue_status - .select { |_, status| status.first } - .map { |queue, _| [queue, @queue_priorities[queue]] } + @paused_until + .reject { |_, unpause_at| unpause_at > Time.now } + .map { |queue, _| [queue, @initial_order.reverse.find_index(queue) + 1] } end private def next_active_queue - return nil unless @queue_order.length > 0 - - start = @current_queue.nil? ? 0 : @queue_order.index(@current_queue) + 1 - i = 0 - - # Loop through the queue order from the current queue until we find a - # queue that is next in line and is not paused - while true - queue = @queue_order[(start + i) % @queue_order.length] - active, delay = @queue_status[queue] - - i += 1 - return queue if active - return nil if i >= @queue_order.length # Prevents infinite looping + now = Time.now + + # Return nil if all queues are paused to prevent infinite loop + return nil if @paused_until.values.all? { |t| t > now } + + # If any queues have unpaused since the last time we checked, reset the ordering + if @last_check && @paused_until.values.any? { |t| t > @last_check && t <= now } + @queue_order = @initial_order.dup end + + @last_check = now + + # `rotate!` through the queue list until we find an unpaused queue + begin + next_queue = @queue_order.first + unpause_at = @paused_until[next_queue] + + @queue_order.rotate! + end while unpause_at > now + + next_queue end def pause(queue) return unless delay > 0 - @queue_status[queue] = [false, Time.now + delay] + @paused_until[queue] = Time.now + delay logger.debug "Paused '#{queue}'" end - - def unpause_queues - # Modifies queue statuses for queues that are now unpaused - @queue_status = @queue_status.each_with_object({}) do |e, h| - queue, status = e - active, delay = status - - h[queue] = if active - [true, nil] - elsif Time.now > delay - logger.debug "Unpaused '#{queue}'" - @current_queue = nil # Reset the check ordering on un-pause - [true, nil] - else - [false, delay] - end - end - end end end end From 5cb87b0a1a40aa8ea46ba03c85dee038f041b859 Mon Sep 17 00:00:00 2001 From: Ash Tyndall Date: Mon, 19 Dec 2016 08:15:44 +0800 Subject: [PATCH 03/14] Better QueueConfiguration options to_s --- lib/shoryuken/polling.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index 461abbef..5aa8fdaa 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -21,7 +21,11 @@ def ==(other) alias_method :eql?, :== def to_s - options.empty? ? name : super + if options.empty? + name + else + "#" + end end end From 37256ae2fdcf427b2ca07008f25e5a24d5ba651a Mon Sep 17 00:00:00 2001 From: Ash Tyndall Date: Mon, 19 Dec 2016 08:19:50 +0800 Subject: [PATCH 04/14] Unused variable change to _ --- lib/shoryuken/polling.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index 5aa8fdaa..90c968c5 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -134,7 +134,7 @@ def initialize(queues) # Priority ordering of the queues, highest priority first @initial_order = queues .group_by { |q| q } - .sort_by { |q, qs| -qs.count } + .sort_by { |_, qs| -qs.count } .map(&:first) # Stores the queue ordering with the next queue as first element From 298f5607a3da1bd0e1db8f40653bfdc768d35de5 Mon Sep 17 00:00:00 2001 From: Ash Tyndall Date: Mon, 19 Dec 2016 08:20:26 +0800 Subject: [PATCH 05/14] Restructure to use an indexing approach with more submethods --- lib/shoryuken/polling.rb | 54 +++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/lib/shoryuken/polling.rb b/lib/shoryuken/polling.rb index 90c968c5..5ba31dd0 100644 --- a/lib/shoryuken/polling.rb +++ b/lib/shoryuken/polling.rb @@ -132,17 +132,17 @@ class StrictPriority < BaseStrategy def initialize(queues) # Priority ordering of the queues, highest priority first - @initial_order = queues + @queues = queues .group_by { |q| q } .sort_by { |_, qs| -qs.count } .map(&:first) - # Stores the queue ordering with the next queue as first element - @queue_order = @initial_order.dup - # Pause status of the queues, default to past time (unpaused) @paused_until = queues .each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) } + + # Start queues at 0 + reset_next_queue end def next_queue @@ -152,44 +152,48 @@ def next_queue def messages_found(queue, messages_found) if messages_found == 0 - # If no messages are found, we pause a given queue pause(queue) else - # Reset the queue order to the initial ordering - @queue_order = @initial_order.dup + reset_next_queue end end def active_queues - @paused_until - .reject { |_, unpause_at| unpause_at > Time.now } - .map { |queue, _| [queue, @initial_order.reverse.find_index(queue) + 1] } + @queues + .reverse + .map.with_index(1) + .reject { |q, _| queue_paused?(q) } + .reverse end private def next_active_queue - now = Time.now + reset_next_queue if queues_unpaused_since? - # Return nil if all queues are paused to prevent infinite loop - return nil if @paused_until.values.all? { |t| t > now } - - # If any queues have unpaused since the last time we checked, reset the ordering - if @last_check && @paused_until.values.any? { |t| t > @last_check && t <= now } - @queue_order = @initial_order.dup + size = @queues.length + size.times do + queue = @queues[@next_queue_index] + @next_queue_index = (@next_queue_index + 1) % size + return queue unless queue_paused?(queue) end - @last_check = now + return nil + end - # `rotate!` through the queue list until we find an unpaused queue - begin - next_queue = @queue_order.first - unpause_at = @paused_until[next_queue] + def queues_unpaused_since? + last = @last_unpause_check + now = @last_unpause_check = Time.now - @queue_order.rotate! - end while unpause_at > now + last && @paused_until.values.any? { |t| t > last && t <= now } + end + + def reset_next_queue + @next_queue_index = 0 + end - next_queue + def queue_paused?(queue) + @paused_until[queue] > Time.now end def pause(queue) From cd17fb12464b4019e0316e489b2b456959cfd347 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Tue, 20 Dec 2016 23:10:10 -0200 Subject: [PATCH 06/14] Fix loading `logfile` from shoryuken.yml See https://github.com/phstc/shoryuken/issues/282#issuecomment-266619546 --- lib/shoryuken/environment_loader.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/shoryuken/environment_loader.rb b/lib/shoryuken/environment_loader.rb index ab5f6a63..4b705937 100644 --- a/lib/shoryuken/environment_loader.rb +++ b/lib/shoryuken/environment_loader.rb @@ -60,8 +60,8 @@ def initialize_aws end def initialize_logger - Shoryuken::Logging.initialize_logger(options[:logfile]) if options[:logfile] - Shoryuken.logger.level = Logger::DEBUG if options[:verbose] + Shoryuken::Logging.initialize_logger(Shoryuken.options[:logfile]) if Shoryuken.options[:logfile] + Shoryuken.logger.level = Logger::DEBUG if Shoryuken.options[:verbose] end def load_rails From 3dce8d38376f0c315b2d9f187a36e902d6fc037e Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Thu, 22 Dec 2016 22:10:03 -0200 Subject: [PATCH 07/14] Bump version to 2.1.2 --- CHANGELOG.md | 19 +++++++++++++++++++ lib/shoryuken/version.rb | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c447c3a..436d8f66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,22 @@ +## [v2.1.2] - 2016-12-22 +- Fix loading `logfile` from shoryuken.yml + - [#296](https://github.com/phstc/shoryuken/pull/296) + +- Add support for Strict priority polling (pending documentation) + - [#288](https://github.com/phstc/shoryuken/pull/288) + +- Add `test_workers` for end-to-end testing supporting + - [#286](https://github.com/phstc/shoryuken/pull/286) + +- Update README documenting `configure_client` and `configure_server` + - [#283](https://github.com/phstc/shoryuken/pull/283) + +- Fix memory leak caused by async tracking busy threads + - [#289](https://github.com/phstc/shoryuken/pull/289) + +- Refactor fetcher, polling strategy and manager + - [#284](https://github.com/phstc/shoryuken/pull/284) + ## [v2.1.1] - 2016-12-05 - Fix aws deprecation warning message - [#279](https://github.com/phstc/shoryuken/pull/279) diff --git a/lib/shoryuken/version.rb b/lib/shoryuken/version.rb index 9adbe10f..8a4181eb 100644 --- a/lib/shoryuken/version.rb +++ b/lib/shoryuken/version.rb @@ -1,3 +1,3 @@ module Shoryuken - VERSION = '2.1.1' + VERSION = '2.1.2' end From 025fc83ee695d23429715c2e4438bdd610252584 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Mon, 9 Jan 2017 11:34:32 -0200 Subject: [PATCH 08/14] Show a warn message when batch isn't supported --- lib/shoryuken/middleware/server/auto_extend_visibility.rb | 8 ++++++++ .../middleware/server/exponential_backoff_retry.rb | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/lib/shoryuken/middleware/server/auto_extend_visibility.rb b/lib/shoryuken/middleware/server/auto_extend_visibility.rb index f8e52ce5..33c98b7f 100644 --- a/lib/shoryuken/middleware/server/auto_extend_visibility.rb +++ b/lib/shoryuken/middleware/server/auto_extend_visibility.rb @@ -4,10 +4,18 @@ module Shoryuken module Middleware module Server class AutoExtendVisibility + include Util + EXTEND_UPFRONT_SECONDS = 5 def call(worker, queue, sqs_msg, body) + if sqs_msg.is_a?(Array) + logger.warn { "Auto extend visibility isn't supported for batch workers" } + return yield + end + timer = auto_visibility_timer(worker, queue, sqs_msg, body) + begin yield ensure diff --git a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb index 6254f108..dc828be8 100755 --- a/lib/shoryuken/middleware/server/exponential_backoff_retry.rb +++ b/lib/shoryuken/middleware/server/exponential_backoff_retry.rb @@ -5,6 +5,11 @@ class ExponentialBackoffRetry include Util def call(worker, queue, sqs_msg, body) + if sqs_msg.is_a?(Array) + logger.warn { "Exponential backoff isn't supported for batch workers" } + return yield + end + started_at = Time.now yield rescue From cc2a7f2be21a8ad36668a73568cf897e76c7c557 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Tue, 10 Jan 2017 09:54:58 -0200 Subject: [PATCH 09/14] Remove nokogiri dependency. It's failing with Ruby 2.0.0 (See: https://travis-ci.org/phstc/shoryuken/jobs/190582221), we could def fix a version that works with Ruby 2.0.0, but TBH I don't think it's a good idea to add a such heavy dependendecy for something not officially suppported. --- shoryuken.gemspec | 1 - spec/spec_helper.rb | 9 --------- 2 files changed, 10 deletions(-) diff --git a/shoryuken.gemspec b/shoryuken.gemspec index 1c976f89..5778637f 100644 --- a/shoryuken.gemspec +++ b/shoryuken.gemspec @@ -21,7 +21,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rake' spec.add_development_dependency 'rspec' spec.add_development_dependency 'pry-byebug' - spec.add_development_dependency 'nokogiri' spec.add_development_dependency 'dotenv' spec.add_dependency 'aws-sdk-core', '~> 2' diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index a04e4765..e7ed56d8 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -21,15 +21,6 @@ Shoryuken.logger.level = Logger::UNKNOWN Celluloid.logger.level = Logger::UNKNOWN -# I'm not sure whether this is an issue specific to running Shoryuken against github.com/comcast/cmb -# as opposed to AWS itself, but sometimes the receive_messages call returns XML that looks like this: -# -# \n\t\n\t ... -# -# The default MultiXML parser is ReXML, which seems to mishandle \n\t chars. Nokogiri seems to be -# the only one that correctly ignore this whitespace. -MultiXml.parser = :nokogiri - class TestWorker include Shoryuken::Worker From 03091d7936c2fcab77e4bae16459791fd8f28b70 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Tue, 10 Jan 2017 09:59:39 -0200 Subject: [PATCH 10/14] Remove multi_xml, it does not seem to be used --- spec/spec_helper.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index e7ed56d8..99b96060 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -5,7 +5,6 @@ require 'celluloid/current' require 'shoryuken' require 'json' -require 'multi_xml' require 'dotenv' Dotenv.load From 4e07a86075e3db14df6405b1fb48e0b854a69fab Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Wed, 11 Jan 2017 08:45:19 -0200 Subject: [PATCH 11/14] Add tests for 025fc83 --- .../middleware/server/auto_extend_visibility_spec.rb | 8 +++++++- .../middleware/server/exponential_backoff_retry_spec.rb | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb b/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb index 080989e2..41bddc28 100644 --- a/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb +++ b/spec/shoryuken/middleware/server/auto_extend_visibility_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -describe Shoryuken::Middleware::Server::AutoExtendVisibility do +RSpec.describe Shoryuken::Middleware::Server::AutoExtendVisibility do let(:queue) { 'default' } let(:visibility_timeout) { 3 } let(:extend_upfront) { 1 } @@ -34,6 +34,12 @@ def run_and_raise(worker, queue, sqs_msg, error_class) stub_const('Shoryuken::Middleware::Server::AutoExtendVisibility::EXTEND_UPFRONT_SECONDS', extend_upfront) end + context 'when batch worker' do + it 'yields' do + expect { |b| subject.call(nil, nil, [], nil, &b) }.to yield_control + end + end + it 'extends message visibility if jobs takes a long time' do TestWorker.get_shoryuken_options['auto_visibility_timeout'] = true diff --git a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb index c7abc3d0..3dd9caf6 100755 --- a/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb +++ b/spec/shoryuken/middleware/server/exponential_backoff_retry_spec.rb @@ -1,6 +1,6 @@ require 'spec_helper' -describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do +RSpec.describe Shoryuken::Middleware::Server::ExponentialBackoffRetry do let(:queue) { 'default' } let(:sqs_queue) { double Shoryuken::Queue } let(:sqs_msg) { double Shoryuken::Message, queue_url: queue, body: 'test', receipt_handle: SecureRandom.uuid, @@ -10,6 +10,12 @@ allow(Shoryuken::Client).to receive(:queues).with(queue).and_return(sqs_queue) end + context 'when batch worker' do + it 'yields' do + expect { |b| subject.call(nil, nil, [], nil, &b) }.to yield_control + end + end + context 'when a job succeeds' do it 'does not retry the job' do TestWorker.get_shoryuken_options['retry_intervals'] = [300, 1800] From 4214db5899fbee7b482b35d206d1a14c34be149c Mon Sep 17 00:00:00 2001 From: Carl Allen Date: Thu, 19 Jan 2017 09:13:02 -0600 Subject: [PATCH 12/14] Require Celluloid 17 https://github.com/phstc/shoryuken/blob/master/lib/shoryuken/cli.rb#L72 will fail for celluloid 0.16 as `celluloid/current` does not exist --- shoryuken.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shoryuken.gemspec b/shoryuken.gemspec index 5778637f..257ca27e 100644 --- a/shoryuken.gemspec +++ b/shoryuken.gemspec @@ -24,5 +24,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'dotenv' spec.add_dependency 'aws-sdk-core', '~> 2' - spec.add_dependency 'celluloid', '~> 0.16' + spec.add_dependency 'celluloid', '~> 0.17' end From d6b16656745064a221ca8b01237191e6660cd038 Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Wed, 25 Jan 2017 13:07:23 -0500 Subject: [PATCH 13/14] Fix excessive logging when 0 messages found Fixes #306 See also: https://github.com/phstc/shoryuken/pull/291#discussion_r95239873 --- lib/shoryuken/fetcher.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/shoryuken/fetcher.rb b/lib/shoryuken/fetcher.rb index 48b7d227..780f4649 100644 --- a/lib/shoryuken/fetcher.rb +++ b/lib/shoryuken/fetcher.rb @@ -14,7 +14,7 @@ def fetch(queue, available_processors) limit = available_processors > FETCH_LIMIT ? FETCH_LIMIT : available_processors sqs_msgs = Array(receive_messages(queue, limit)) - logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } + logger.info { "Found #{sqs_msgs.size} messages for '#{queue.name}'" } if !sqs_msgs.empty? logger.debug { "Fetcher for '#{queue}' completed in #{elapsed(started_at)} ms" } sqs_msgs rescue => ex From 087b1756c9dc294ab3b51259888a719cf13b7adb Mon Sep 17 00:00:00 2001 From: Pablo Cantero Date: Fri, 27 Jan 2017 14:14:38 -0500 Subject: [PATCH 14/14] Bump to 2.1.3 --- CHANGELOG.md | 10 ++++++++++ lib/shoryuken/version.rb | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 436d8f66..6514a0ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## [v2.1.3] - 2017-01-27 +- Show a warn message when batch isn't supported + - [#302](https://github.com/phstc/shoryuken/pull/302) + +- Require Celluloid ~> 17 + - [#305](https://github.com/phstc/shoryuken/pull/305) + +- Fix excessive logging when 0 messages found + - [#307](https://github.com/phstc/shoryuken/pull/307) + ## [v2.1.2] - 2016-12-22 - Fix loading `logfile` from shoryuken.yml - [#296](https://github.com/phstc/shoryuken/pull/296) diff --git a/lib/shoryuken/version.rb b/lib/shoryuken/version.rb index 8a4181eb..e9bd2fd1 100644 --- a/lib/shoryuken/version.rb +++ b/lib/shoryuken/version.rb @@ -1,3 +1,3 @@ module Shoryuken - VERSION = '2.1.2' + VERSION = '2.1.3' end