Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce socket plugin helper #1356

Merged
merged 8 commits into from
Dec 9, 2016
368 changes: 195 additions & 173 deletions lib/fluent/plugin/out_forward.rb

Large diffs are not rendered by default.

117 changes: 71 additions & 46 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Output < Base

CHUNKING_FIELD_WARN_NUM = 4

PROCESS_CLOCK_ID = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC

config_param :time_as_integer, :bool, default: false

# `<buffer>` and `<secondary>` sections are available only when '#format' and '#write' are implemented
Expand Down Expand Up @@ -138,7 +140,7 @@ def prefer_delayed_commit
end

# Internal states
FlushThreadState = Struct.new(:thread, :next_time)
FlushThreadState = Struct.new(:thread, :next_clock)
DequeuedChunkInfo = Struct.new(:chunk_id, :time, :timeout) do
def expired?
time + timeout < Time.now
Expand Down Expand Up @@ -898,9 +900,9 @@ def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
@retry_mutex.synchronize do
if @retry # success to flush chunks in retries
if secondary
log.warn "retry succeeded by secondary.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id)
log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove plugin_id from the logs? Redundancy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundancy. Plugin logger fills it automatically now.

else
log.warn "retry succeeded.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(chunk_id)
log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id)
end
@retry = nil
end
Expand All @@ -918,6 +920,8 @@ def rollback_write(chunk_id)
# in many cases, false can be just ignored
if @buffer.takeback_chunk(chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(chunk_id, @as_secondary)
true
else
false
Expand All @@ -930,7 +934,9 @@ def try_rollback_write
info = @dequeued_chunks.shift
if @buffer.takeback_chunk(info.chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
log.warn "failed to flush the buffer chunk, timeout to commit.", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time
log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(info.chunk_id, @as_secondary)
end
end
end
Expand All @@ -943,7 +949,9 @@ def try_rollback_all
info = @dequeued_chunks.shift
if @buffer.takeback_chunk(info.chunk_id)
@counters_monitor.synchronize{ @rollback_count += 1 }
log.info "delayed commit for buffer chunks was cancelled in shutdown", plugin_id: plugin_id, chunk_id: dump_unique_id_hex(info.chunk_id)
log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id)
primary = @as_secondary ? @primary_instance : self
primary.update_retry_state(info.chunk_id, @as_secondary)
end
end
end
Expand Down Expand Up @@ -997,43 +1005,60 @@ def try_flush
log.trace "done to commit a chunk", chunk: dump_chunk_id
end
rescue => e
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id)
if output.delayed_commit
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id }
end
end
@buffer.takeback_chunk(chunk.unique_id)

@retry_mutex.synchronize do
if @retry
@counters_monitor.synchronize{ @num_errors += 1 }
if @retry.limit?
records = @buffer.queued_records
log.error "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue.", plugin_id: plugin_id, retry_times: @retry.steps, records: records, error: e
log.error_backtrace e.backtrace
@buffer.clear_queue!
log.debug "buffer queue cleared", plugin_id: plugin_id
@retry = nil
else
@retry.step
msg = if using_secondary
"failed to flush the buffer with secondary output."
else
"failed to flush the buffer."
end
log.warn msg, plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e
log.warn_backtrace e.backtrace
end
update_retry_state(chunk.unique_id, using_secondary, e)

raise if @under_plugin_development && !@retry_for_error_chunk
end
end

def update_retry_state(chunk_id, using_secondary, error = nil)
@retry_mutex.synchronize do
@counters_monitor.synchronize{ @num_errors += 1 }
chunk_id_hex = dump_unique_id_hex(chunk_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunk_id_hex is used in only log message.
Not critical but avoid dump_unique_id_hex is bit better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are almost no pass not to output a log with chunk_id_hex.
It's just:

  • @retry exists, and
    • without error (called from #rollback_commit)
    • or
    • @retry.limit? is true (all chunks will be cleared)

That variable is referred in most cases, so I think it's worth to assign into a variable to reduce code complexity.


unless @retry
@retry = retry_state(@buffer_config.retry_randomize)
if error
log.warn "failed to flush the buffer.", retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
return
end

# @retry exists

if error
if @retry.limit?
records = @buffer.queued_records
msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue."
log.error msg, retry_times: @retry.steps, records: records, error: error
log.error_backtrace error.backtrace
elsif using_secondary
msg = "failed to flush the buffer with secondary output."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
else
@retry = retry_state(@buffer_config.retry_randomize)
@counters_monitor.synchronize{ @num_errors += 1 }
log.warn "failed to flush the buffer.", plugin_id: plugin_id, retry_time: @retry.steps, next_retry: @retry.next_time, chunk: dump_unique_id_hex(chunk.unique_id), error: e
log.warn_backtrace e.backtrace
msg = "failed to flush the buffer."
log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error
log.warn_backtrace error.backtrace
end
end

raise if @under_plugin_development && !@retry_for_error_chunk
if @retry.limit?
@buffer.clear_queue!
log.debug "buffer queue cleared"
@retry = nil
else
@retry.step
end
end
end

Expand All @@ -1060,7 +1085,7 @@ def submit_flush_once
# Without locks: it is rough but enough to select "next" writer selection
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
state.next_time = 0
state.next_clock = 0
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.thread.run
else
Expand Down Expand Up @@ -1102,7 +1127,7 @@ def enqueue_thread_wait
# only for tests of output plugin
def flush_thread_wakeup
@output_flush_threads.each do |state|
state.next_time = 0
state.next_clock = 0
state.thread.run
end
end
Expand Down Expand Up @@ -1156,7 +1181,7 @@ def enqueue_thread_run
end
rescue => e
raise if @under_plugin_development
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error: e
log.error "unexpected error while checking flushed chunks. ignored.", error: e
log.error_backtrace
ensure
@output_enqueue_thread_waiting = false
Expand All @@ -1166,7 +1191,7 @@ def enqueue_thread_run
end
rescue => e
# normal errors are rescued by inner begin-rescue clause.
log.error "error on enqueue thread", plugin_id: plugin_id, error: e
log.error "error on enqueue thread", error: e
log.error_backtrace
raise
end
Expand All @@ -1175,9 +1200,7 @@ def enqueue_thread_run
def flush_thread_run(state)
flush_thread_interval = @buffer_config.flush_thread_interval

# If the given clock_id is not supported, Errno::EINVAL is raised.
clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC
state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval
state.next_clock = Process.clock_gettime(PROCESS_CLOCK_ID) + flush_thread_interval

while !self.after_started? && !self.stopped?
sleep 0.5
Expand All @@ -1187,16 +1210,18 @@ def flush_thread_run(state)
begin
# This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
while @output_flush_threads_running
time = Process.clock_gettime(clock_id)
interval = state.next_time - time
current_clock = Process.clock_gettime(PROCESS_CLOCK_ID)
interval = state.next_clock - current_clock

if state.next_time <= time
if state.next_clock <= current_clock && (!@retry || @retry_mutex.synchronize{ @retry.next_time } <= Time.now)
try_flush
# next_flush_interval uses flush_thread_interval or flush_thread_burst_interval (or retrying)

# next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
interval = next_flush_time.to_f - Time.now.to_f
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected (because @retry still exists)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_time = Process.clock_gettime(clock_id) + interval
# TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
# because @retry still exists (#commit_write is not called yet in #try_flush)
# @retry should be cleared if delayed commit is enabled? Or any other solution?
state.next_clock = Process.clock_gettime(PROCESS_CLOCK_ID) + interval
end

if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
Expand All @@ -1210,7 +1235,7 @@ def flush_thread_run(state)
rescue => e
# normal errors are rescued by output plugins in #try_flush
# so this rescue section is for critical & unrecoverable errors
log.error "error on output thread", plugin_id: plugin_id, error: e
log.error "error on output thread", error: e
log.error_backtrace
raise
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
require 'fluent/plugin_helper/formatter'
require 'fluent/plugin_helper/inject'
require 'fluent/plugin_helper/extract'
# require 'fluent/plugin_helper/socket'
require 'fluent/plugin_helper/socket'
require 'fluent/plugin_helper/server'
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/compat_parameters'
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin_helper/inject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def configure(conf)
if @_inject_hostname_key
@_inject_hostname = @inject_config.hostname
unless @_inject_hostname
@_inject_hostname = Socket.gethostname
@_inject_hostname = ::Socket.gethostname
log.info "using hostname for specified field", host_key: @_inject_hostname_key, host_name: @_inject_hostname
end
end
Expand Down
41 changes: 24 additions & 17 deletions lib/fluent/plugin_helper/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,24 @@ def server_create_connection(title, port, proto: :tcp, bind: '0.0.0.0', shared:
# sock.remote_port
# # ...
# end
def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)

raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp

raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2

if proto == :tcp || proto == :tls # default linger_timeout only for server
socket_options[:linger_timeout] ||= 0
end

socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
unless socket
socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
end

if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
Expand All @@ -140,9 +144,15 @@ def server_create(title, port, proto: :tcp, bind: '0.0.0.0', shared: true, backl
raise "not implemented yet"
when :udp
raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
sock = server_create_udp_socket(shared, bind, port)
socket_option_setter.call(sock)
server = EventHandler::UDPServer.new(sock, max_bytes, flags, @log, @under_plugin_development, &callback)
if socket
sock = socket
close_socket = false
else
sock = server_create_udp_socket(shared, bind, port)
socket_option_setter.call(sock)
close_socket = true
end
server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
when :unix
raise "not implemented yet"
else
Expand Down Expand Up @@ -267,10 +277,11 @@ def server_create_tls_socket(shared, bind, port)
end

class CallbackSocket
def initialize(server_type, sock, enabled_events = [])
def initialize(server_type, sock, enabled_events = [], close_socket: true)
@server_type = server_type
@sock = sock
@enabled_events = enabled_events
@close_socket = close_socket
end

def remote_addr
Expand All @@ -294,12 +305,7 @@ def write(data)
end

def close
@sock.close
# close cool.io socket in another thread, not to make deadlock
# for flushing @_write_buffer when conn.close is called in callback
# ::Thread.new{
# @sock.close
# }
@sock.close if @close_socket
end

def data(&callback)
Expand Down Expand Up @@ -334,8 +340,8 @@ def write(data)
end

class UDPCallbackSocket < CallbackSocket
def initialize(sock, peeraddr)
super("udp", sock, [])
def initialize(sock, peeraddr, **kwargs)
super("udp", sock, [], **kwargs)
@peeraddr = peeraddr
end

Expand All @@ -358,14 +364,15 @@ def write(data)

module EventHandler
class UDPServer < Coolio::IO
def initialize(sock, max_bytes, flags, log, under_plugin_development, &callback)
def initialize(sock, max_bytes, flags, close_socket, log, under_plugin_development, &callback)
raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket)

super(sock)

@sock = sock
@max_bytes = max_bytes
@flags = flags
@close_socket = close_socket
@log = log
@under_plugin_development = under_plugin_development
@callback = callback
Expand Down Expand Up @@ -398,7 +405,7 @@ def on_readable_with_sock
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET
return
end
@callback.call(data, UDPCallbackSocket.new(@sock, addr))
@callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket))
rescue => e
@log.error "unexpected error in processing UDP data", error: e
@log.error_backtrace
Expand Down
Loading