Skip to content

Commit

Permalink
Don't raise exceptions from within AMQP callbacks
Browse files Browse the repository at this point in the history
Raising exceptions from within AMQP callbacks (defained in MessagingHandler)
prevents qpid_proton gem from closing TCP connection properly which results in
file descriptor leakage (see https://issues.apache.org/jira/browse/PROTON-1791).
It's a bug in gem (sockets shouldn't leak no matter what) which will be resolved with
next release (qpid_proton 0.22.0) in a month, but with this commit we avoid raising
exceptions in callbacks to resolve file descriptor problem immediately.

Signed-off-by: Miha Pleško <[email protected]>
  • Loading branch information
miha-plesko committed Mar 19, 2018
1 parent bb7a1a2 commit acf3949
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
class ManageIQ::Providers::Nuage::NetworkManager::EventCatcher::MessagingHandler < Qpid::Proton::MessagingHandler
attr_reader :errors

def initialize(options = {})
require 'qpid_proton'

Expand All @@ -10,6 +12,7 @@ def initialize(options = {})
@message_handler_block = @options.delete(:message_handler_block)
@url = @options.delete(:url)
@timeout = @options.delete(:amqp_connect_timeout) || 5.seconds
@errors = []
end

def on_container_start(container)
Expand All @@ -18,31 +21,47 @@ def on_container_start(container)
@topics.each { |topic| @conn.open_receiver("topic://#{topic}") }
end
rescue Timeout::Error
raise MiqException::MiqHostError, "Timeout connecting to AMQP endpoint #{@url}"
add_error(MiqException::MiqHostError.new("Timeout connecting to AMQP endpoint #{@url}"), container)
rescue Errno::ECONNREFUSED => err
add_error(MiqException::MiqHostError.new("ECONNREFUSED connecting to AMQP endpoint #{@url}: #{err}"), container)
rescue SocketError => err
add_error(MiqException::MiqHostError.new("Error connecting to AMQP endpoint #{@url}: #{err}"), container)
end

def on_connection_open(connection)
# In case connection test was requested, close the connection immediately.
connection.container.stop if @test_connection
end

def on_connection_error(_connection)
raise MiqException::MiqInvalidCredentialsError, "Connection failed due to bad username or password"
end

def on_transport_error(_transport)
raise MiqException::MiqHostError, "Transport error"
def on_connection_error(connection)
add_error("AMQP connection error: #{connection.condition}")
end

def on_message(_delivery, message)
@message_handler_block&.call(JSON.parse(message.body))
end

def on_transport_close(_transport)
raise MiqException::MiqHostError, "Transport closed unexpectedly"
def stop
unless @conn.nil? || @conn.container.stopped
$nuage_log.debug("#{self.class.log_prefix} Stopping AMQP")
@conn.container.stop
end
@conn = nil
end

# Memorize error and request container stop if container is given.
def add_error(err, container_to_stop = nil)
err = MiqException::Error.new(err) if err.kind_of?(String)
$nuage_log.debug("#{self.class.log_prefix} #{err.class.name}: #{err.message}")
@errors << err
container_to_stop.stop unless container_to_stop.nil? || container_to_stop.stopped
end

def stop
@conn&.close
def raise_for_error
raise @errors.first unless @errors.empty? # first error is root cause, others are just followup
end

def self.log_prefix
"MIQ(#{name})"
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ def self.test_amqp_connection(options = {})
stream = new(options)
ok = stream.with_fallback_urls(options[:urls]) do
stream.connection.run
stream.connection.handler.raise_for_error
return true
end
raise MiqException::MiqHostError, "Could not connect to any of the #{options[:urls].count} AMQP hostnames" unless ok
true
end

def self.log_prefix
"MIQ(#{self.class.name})"
"MIQ(#{name})"
end

def initialize(options = {})
Expand All @@ -31,6 +32,7 @@ def start(&message_handler_block)
@options[:message_handler_block] = message_handler_block if message_handler_block
with_fallback_urls(@options[:urls]) do
connection.run
connection.handler.raise_for_error
end
end

Expand All @@ -53,7 +55,7 @@ def with_fallback_urls(urls)
begin
@options[:url] = url
yield
rescue MiqException::MiqHostError, Errno::ECONNREFUSED, SocketError => err
rescue MiqException::Error => err
$nuage_log.info("#{self.class.log_prefix} #{endpoint_str} errored: #{err}")
stop
reset_connection
Expand Down
6 changes: 4 additions & 2 deletions spec/models/manageiq/providers/nuage/network_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

context 'AMQP connection' do
before do
@conn = double
@conn = double('connection', :handler => handler)
allow(Qpid::Proton::Container).to receive(:new).and_return(@conn)

creds = {}
Expand All @@ -62,9 +62,11 @@
@ems.update_authentication(creds, :save => false)
end

let(:handler) { double('handler') }

it 'verifies AMQP credentials' do
allow(@conn).to receive(:run).and_return(true)

expect(handler).to receive(:raise_for_error)
expect(@ems.verify_credentials(:amqp)).to be_truthy
end

Expand Down

0 comments on commit acf3949

Please sign in to comment.