Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manually close AMQP sockets upon stopping event catcher #76

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def initialize(options = {})
end

def on_container_start(container)
$nuage_log.debug("#{self.class.log_prefix} Starting AMQP")
Timeout.timeout(@timeout) { @conn = container.connect(@url, @options) }
unless @test_connection
@topics.each { |topic| @conn.open_receiver("topic://#{topic}") }
Expand All @@ -26,23 +27,37 @@ def on_connection_open(connection)
connection.container.stop if @test_connection
end

def on_connection_error(_connection)
raise MiqException::MiqInvalidCredentialsError, "Connection failed due to bad username or password"
def on_connection_error(connection)
msg = "#{self.class.log_prefix} AMQP on_connection_error: #{connection.condition}"
$nuage_log.debug(msg)
raise MiqException::MiqHostError, msg
end

def on_transport_error(_transport)
raise MiqException::MiqHostError, "Transport error"
def on_transport_error(transport)
msg = "#{self.class.log_prefix} AMQP on_transport_error: #{transport.connection.condition}"
$nuage_log.debug(msg)
raise MiqException::MiqHostError, msg
end

def on_message(_delivery, message)
@message_handler_block&.call(JSON.parse(message.body))
end

def on_transport_close(_transport)
raise MiqException::MiqHostError, "Transport closed unexpectedly"
def on_transport_close(transport)
msg = "#{self.class.log_prefix} AMQP on_transport_close: #{transport.connection.condition}"
$nuage_log.debug(msg)
raise MiqException::MiqHostError, msg
end

def stop
@conn&.close
unless @conn.nil? || @conn.container.stopped
$nuage_log.debug("#{self.class.log_prefix} Stopping AMQP")
@conn.container.stop
end
@conn = nil
end

def self.log_prefix
"MIQ(#{name})"
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# NOTE: here we patch Qpid::Proton::Container object to tackle a blocking bug that we observed where
# sockets are not being closed properly upon connection close which results in file descriptor leakage,
# see https://issues.apache.org/jira/browse/PROTON-1791
# TODO: remove this entire class once the issues is fixed (hopefully with qpid_proton 0.22.0)

class ManageIQ::Providers::Nuage::NetworkManager::EventCatcher::QpidContainer < Qpid::Proton::Container
# Override to capture connection drivers.
def connection_driver(io, opts = nil, server = false)
driver = super(io, opts, server)

# Capture connection drivers so that we can manually close them later.
@drivers ||= []
@drivers << driver

driver
end

# Override to manually close sockets upon container stop.
def stop(error = nil)
$nuage_log.debug("#{self.class.log_prefix} Stop container called")
super(error)
close_sockets
end

def self.log_prefix
"MIQ(#{name})"
end

private

# Manually close sockets or else they remain in CLOSE_WAIT statue forever consuming file descriptors.
def close_sockets
$nuage_log.debug("#{self.class.log_prefix} Closing sockets")
@drivers.each { |d| d.to_io.close } if @drivers
@drivers = nil
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def stop
def connection
unless @connection
@handler = ManageIQ::Providers::Nuage::NetworkManager::EventCatcher::MessagingHandler.new(@options.clone)
@connection = Qpid::Proton::Container.new(@handler)
@connection = ManageIQ::Providers::Nuage::NetworkManager::EventCatcher::QpidContainer.new(@handler)
end
@connection
end
Expand Down