Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock between on_writable and close #2165

Merged
merged 2 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def handle_connection(conn)
when :pingpong
success, reason_or_salt, shared_key = check_ping(msg, conn.remote_addr, user_auth_salt, nonce)
unless success
conn.on(:write_complete) { |c| c.close }
conn.on(:write_complete) { |c| c.close_after_write_complete }
send_data.call(serializer, generate_pong(false, reason_or_salt, nonce, shared_key))
next
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ def on_read(sock, ri, data)
when :pingpong
succeeded, reason = check_pong(ri, data)
unless succeeded
@log.warn "connection refused to #{@name}: #{reason}"
@log.warn "connection refused to #{@name || @host}: #{reason}"
disable! # shutdown
return
end
Expand Down
16 changes: 16 additions & 0 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ def write(data)
raise "not implemented here"
end

def close_after_write_complete
@sock.close_after_write_complete = true
end

def close
@sock.close if @close_socket
end
Expand Down Expand Up @@ -492,6 +496,8 @@ def write(data)

module EventHandler
class UDPServer < Coolio::IO
attr_writer :close_after_write_complete # dummy for consistent method call in callbacks

def initialize(sock, max_bytes, flags, close_socket, log, under_plugin_development, &callback)
raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket)

Expand Down Expand Up @@ -543,6 +549,7 @@ def on_readable_with_sock

class TCPServer < Coolio::TCPSocket
attr_reader :closing
attr_writer :close_after_write_complete

def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)
Expand All @@ -560,6 +567,7 @@ def initialize(sock, socket_option_setter, close_callback, log, under_plugin_dev
@close_callback = close_callback

@callback_connection = nil
@close_after_write_complete = false
@closing = false

@mutex = Mutex.new # to serialize #write and #close
Expand Down Expand Up @@ -587,6 +595,11 @@ def write(data)
end
end

def on_writable
super
close if @close_after_write_complete
end

def on_connect
@callback_connection = TCPCallbackSocket.new(self)
@connect_callback.call(@callback_connection)
Expand Down Expand Up @@ -625,6 +638,7 @@ def close

class TLSServer < Coolio::Socket
attr_reader :closing
attr_writer :close_after_write_complete

# It can't use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.
def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
Expand All @@ -645,6 +659,7 @@ def initialize(sock, context, socket_option_setter, close_callback, log, under_p
@close_callback = close_callback

@callback_connection = nil
@close_after_write_complete = false
@closing = false

@mutex = Mutex.new # to serialize #write and #close
Expand Down Expand Up @@ -738,6 +753,7 @@ def on_writable
@_handler_write_buffer.slice!(0, written_bytes)
super
end
close if @close_after_write_complete
rescue IO::WaitWritable, IO::WaitReadable
return
rescue Errno::EINTR
Expand Down