Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't raise exceptions from within AMQP callbacks #78

Merged
merged 1 commit into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class ManageIQ::Providers::Nuage::NetworkManager::EventCatcher::MessagingHandler < Qpid::Proton::MessagingHandler
attr_reader :errors

def initialize(options = {})
require 'qpid_proton'

Expand All @@ -10,6 +12,7 @@ def initialize(options = {})
@message_handler_block = @options.delete(:message_handler_block)
@url = @options.delete(:url)
@timeout = @options.delete(:amqp_connect_timeout) || 5.seconds
@errors = []
end

def on_container_start(container)
Expand All @@ -18,31 +21,47 @@ def on_container_start(container)
@topics.each { |topic| @conn.open_receiver("topic://#{topic}") }
end
rescue Timeout::Error
raise MiqException::MiqHostError, "Timeout connecting to AMQP endpoint #{@url}"
add_error(MiqException::MiqHostError.new("Timeout connecting to AMQP endpoint #{@url}"), container)
rescue Errno::ECONNREFUSED => err
add_error(MiqException::MiqHostError.new("ECONNREFUSED connecting to AMQP endpoint #{@url}: #{err}"), container)
rescue SocketError => err
add_error(MiqException::MiqHostError.new("Error connecting to AMQP endpoint #{@url}: #{err}"), container)
end

def on_connection_open(connection)
# In case connection test was requested, close the connection immediately.
connection.container.stop if @test_connection
end

def on_connection_error(_connection)
raise MiqException::MiqInvalidCredentialsError, "Connection failed due to bad username or password"
end

def on_transport_error(_transport)
raise MiqException::MiqHostError, "Transport error"
def on_connection_error(connection)
add_error("AMQP connection error: #{connection.condition}")
end

def on_message(_delivery, message)
@message_handler_block&.call(JSON.parse(message.body))
end

def on_transport_close(_transport)
raise MiqException::MiqHostError, "Transport closed unexpectedly"
def stop
unless @conn.nil? || @conn.container.stopped
$nuage_log.debug("#{self.class.log_prefix} Stopping AMQP")
@conn.container.stop
end
@conn = nil
end

# Memorize error and request container stop if container is given.
def add_error(err, container_to_stop = nil)
err = MiqException::Error.new(err) if err.kind_of?(String)
$nuage_log.debug("#{self.class.log_prefix} #{err.class.name}: #{err.message}")
@errors << err
container_to_stop.stop unless container_to_stop.nil? || container_to_stop.stopped
end

def stop
@conn&.close
def raise_for_error
raise @errors.first unless @errors.empty? # first error is root cause, others are just followup
end

def self.log_prefix
"MIQ(#{name})"
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ def self.test_amqp_connection(options = {})
stream = new(options)
ok = stream.with_fallback_urls(options[:urls]) do
stream.connection.run
stream.connection.handler.raise_for_error
return true
end
raise MiqException::MiqHostError, "Could not connect to any of the #{options[:urls].count} AMQP hostnames" unless ok
true
end

def self.log_prefix
"MIQ(#{self.class.name})"
"MIQ(#{name})"
end

def initialize(options = {})
Expand All @@ -31,6 +32,7 @@ def start(&message_handler_block)
@options[:message_handler_block] = message_handler_block if message_handler_block
with_fallback_urls(@options[:urls]) do
connection.run
connection.handler.raise_for_error
end
end

Expand All @@ -53,7 +55,7 @@ def with_fallback_urls(urls)
begin
@options[:url] = url
yield
rescue MiqException::MiqHostError, Errno::ECONNREFUSED, SocketError => err
rescue MiqException::Error => err
$nuage_log.info("#{self.class.log_prefix} #{endpoint_str} errored: #{err}")
stop
reset_connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

context 'AMQP connection' do
before do
@conn = double
@conn = double('connection', :handler => handler)
allow(Qpid::Proton::Container).to receive(:new).and_return(@conn)

creds = {}
Expand All @@ -62,9 +62,11 @@
@ems.update_authentication(creds, :save => false)
end

let(:handler) { double('handler') }

it 'verifies AMQP credentials' do
allow(@conn).to receive(:run).and_return(true)

expect(handler).to receive(:raise_for_error)
expect(@ems.verify_credentials(:amqp)).to be_truthy
end

Expand Down