Skip to content

Commit

Permalink
Merge pull request #78 from miha-plesko/remove-raise-from-callbacks
Browse files Browse the repository at this point in the history
Don't raise exceptions from within AMQP callbacks
(cherry picked from commit 8317791)

Fixes https://bugzilla.redhat.com/show_bug.cgi?id=1563361
  • Loading branch information
agrare authored and simaishi committed Apr 3, 2018
1 parent 6050bd3 commit 57c3a3b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class ManageIQ::Providers::Nuage::NetworkManager::EventCatcher::MessagingHandler < Qpid::Proton::MessagingHandler
attr_reader :errors

def initialize(options = {})
require 'qpid_proton'

Expand All @@ -10,6 +12,7 @@ def initialize(options = {})
@message_handler_block = @options.delete(:message_handler_block)
@url = @options.delete(:url)
@timeout = @options.delete(:amqp_connect_timeout) || 5.seconds
@errors = []
end

def on_container_start(container)
Expand All @@ -18,31 +21,47 @@ def on_container_start(container)
@topics.each { |topic| @conn.open_receiver("topic://#{topic}") }
end
rescue Timeout::Error
raise MiqException::MiqHostError, "Timeout connecting to AMQP endpoint #{@url}"
add_error(MiqException::MiqHostError.new("Timeout connecting to AMQP endpoint #{@url}"), container)
rescue Errno::ECONNREFUSED => err
add_error(MiqException::MiqHostError.new("ECONNREFUSED connecting to AMQP endpoint #{@url}: #{err}"), container)
rescue SocketError => err
add_error(MiqException::MiqHostError.new("Error connecting to AMQP endpoint #{@url}: #{err}"), container)
end

def on_connection_open(connection)
# In case connection test was requested, close the connection immediately.
connection.container.stop if @test_connection
end

def on_connection_error(_connection)
raise MiqException::MiqInvalidCredentialsError, "Connection failed due to bad username or password"
end

def on_transport_error(_transport)
raise MiqException::MiqHostError, "Transport error"
def on_connection_error(connection)
add_error("AMQP connection error: #{connection.condition}")
end

def on_message(_delivery, message)
@message_handler_block&.call(JSON.parse(message.body))
end

def on_transport_close(_transport)
raise MiqException::MiqHostError, "Transport closed unexpectedly"
def stop
unless @conn.nil? || @conn.container.stopped
$nuage_log.debug("#{self.class.log_prefix} Stopping AMQP")
@conn.container.stop
end
@conn = nil
end

# Memorize error and request container stop if container is given.
def add_error(err, container_to_stop = nil)
err = MiqException::Error.new(err) if err.kind_of?(String)
$nuage_log.debug("#{self.class.log_prefix} #{err.class.name}: #{err.message}")
@errors << err
container_to_stop.stop unless container_to_stop.nil? || container_to_stop.stopped
end

def stop
@conn&.close
def raise_for_error
raise @errors.first unless @errors.empty? # first error is root cause, others are just followup
end

def self.log_prefix
"MIQ(#{name})"
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ def self.test_amqp_connection(options = {})
stream = new(options)
ok = stream.with_fallback_urls(options[:urls]) do
stream.connection.run
stream.connection.handler.raise_for_error
return true
end
raise MiqException::MiqHostError, "Could not connect to any of the #{options[:urls].count} AMQP hostnames" unless ok
true
end

def self.log_prefix
"MIQ(#{self.class.name})"
"MIQ(#{name})"
end

def initialize(options = {})
Expand All @@ -31,6 +32,7 @@ def start(&message_handler_block)
@options[:message_handler_block] = message_handler_block if message_handler_block
with_fallback_urls(@options[:urls]) do
connection.run
connection.handler.raise_for_error
end
end

Expand All @@ -53,7 +55,7 @@ def with_fallback_urls(urls)
begin
@options[:url] = url
yield
rescue MiqException::MiqHostError, Errno::ECONNREFUSED, SocketError => err
rescue MiqException::Error => err
$nuage_log.info("#{self.class.log_prefix} #{endpoint_str} errored: #{err}")
stop
reset_connection
Expand Down
6 changes: 4 additions & 2 deletions spec/models/manageiq/providers/nuage/network_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

context 'AMQP connection' do
before do
@conn = double
@conn = double('connection', :handler => handler)
allow(Qpid::Proton::Container).to receive(:new).and_return(@conn)

creds = {}
Expand All @@ -62,9 +62,11 @@
@ems.update_authentication(creds, :save => false)
end

let(:handler) { double('handler') }

it 'verifies AMQP credentials' do
allow(@conn).to receive(:run).and_return(true)

expect(handler).to receive(:raise_for_error)
expect(@ems.verify_credentials(:amqp)).to be_truthy
end

Expand Down

0 comments on commit 57c3a3b

Please sign in to comment.