Skip to content

Commit

Permalink
Merge pull request #1382 from fluent/migrate-plugins-with-server-plug…
Browse files Browse the repository at this point in the history
…in-helper

Migrate plugins with server plugin helper
  • Loading branch information
tagomoris authored Dec 21, 2016
2 parents 934ddf9 + 440507b commit cbcdc5a
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 119 deletions.
63 changes: 46 additions & 17 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class ForwardInput < Input
config_param :chunk_size_limit, :size, default: nil
desc 'Skip an event if incoming event is invalid.'
config_param :skip_invalid_event, :bool, default: false

desc "The field name of the client's source address."
config_param :source_address_key, :string, default: nil
desc "The field name of the client's hostname."
config_param :source_hostname_key, :string, default: nil

Expand Down Expand Up @@ -98,6 +101,7 @@ def configure(conf)
raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
end
end
@enable_field_injection = @source_address_key || @source_hostname_key

if @security
if @security.user_auth && @security.users.empty?
Expand Down Expand Up @@ -136,7 +140,6 @@ def configure(conf)
})
end
end
@lsock = @usock = nil
end

HEARTBEAT_UDP_PAYLOAD = "\0"
Expand Down Expand Up @@ -197,7 +200,7 @@ def handle_connection(conn)
log.debug "connection established", address: conn.remote_addr, port: conn.remote_port
state = :established
when :established
options = on_message(msg, chunk_size, conn.remote_host)
options = on_message(msg, chunk_size, conn)
if options && r = response(options)
log.trace "sent response to fluent socket", address: conn.remote_addr, response: r
conn.on_write_complete{ conn.close } if @deny_keepalive
Expand Down Expand Up @@ -253,26 +256,26 @@ def response(option)
nil
end

def on_message(msg, chunk_size, remote_host)
def on_message(msg, chunk_size, conn)
if msg.nil?
# for future TCP heartbeat_request
return
end

# TODO: raise an exception if broken chunk is generated by recoverable situation
unless msg.is_a?(Array)
log.warn "incoming chunk is broken:", host: remote_host, msg: msg
log.warn "incoming chunk is broken:", host: conn.remote_host, msg: msg
return
end

tag = msg[0]
entries = msg[1]

if @chunk_size_limit && (chunk_size > @chunk_size_limit)
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: remote_host, limit: @chunk_size_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, host: conn.remote_host, limit: @chunk_size_limit, size: chunk_size
return
elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: remote_host, limit: @chunk_size_warn_limit, size: chunk_size
log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, host: conn.remote_host, limit: @chunk_size_warn_limit, size: chunk_size
end

case entries
Expand All @@ -282,14 +285,16 @@ def on_message(msg, chunk_size, remote_host)
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, remote_host) if @skip_invalid_event
es = add_source_host(es, remote_host) if @source_hostname_key
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
if @enable_field_injection
es = add_source_info(es, conn)
end
router.emit_stream(tag, es)

when Array
# Forward
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, remote_host)
check_and_skip_invalid_event(tag, entries, conn.remote_host)
else
es = Fluent::MultiEventStream.new
entries.each { |e|
Expand All @@ -301,7 +306,9 @@ def on_message(msg, chunk_size, remote_host)
}
es
end
es = add_source_host(es, remote_host) if @source_hostname_key
if @enable_field_injection
es = add_source_info(es, conn)
end
router.emit_stream(tag, es)
option = msg[2]

Expand All @@ -310,12 +317,15 @@ def on_message(msg, chunk_size, remote_host)
time = msg[1]
record = msg[2]
if @skip_invalid_event && invalid_event?(tag, time, record)
log.warn "got invalid event and drop it:", host: remote_host, tag: tag, time: time, record: record
log.warn "got invalid event and drop it:", host: conn.remote_host, tag: tag, time: time, record: record
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Fluent::Engine.now if time.to_i == 0
record[@source_hostname_key] = remote_host if @source_hostname_key
if @enable_field_injection
record[@source_address_key] = conn.remote_addr if @source_address_key
record[@source_hostname_key] = conn.remote_host if @source_hostname_key
end
router.emit(tag, time, record)
option = msg[3]
end
Expand All @@ -340,12 +350,31 @@ def check_and_skip_invalid_event(tag, es, remote_host)
new_es
end

def add_source_host(es, host)
def add_source_info(es, conn)
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
record[@source_hostname_key] = host
new_es.add(time, record)
}
if @source_address_key && @source_hostname_key
address = conn.remote_addr
hostname = conn.remote_host
es.each { |time, record|
record[@source_address_key] = address
record[@source_hostname_key] = hostname
new_es.add(time, record)
}
elsif @source_address_key
address = conn.remote_addr
es.each { |time, record|
record[@source_address_key] = address
new_es.add(time, record)
}
elsif @source_hostname_key
hostname = conn.remote_host
es.each { |time, record|
record[@source_hostname_key] = hostname
new_es.add(time, record)
}
else
raise "BUG: don't call this method in this case"
end
new_es
end

Expand Down
2 changes: 2 additions & 0 deletions lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def parse(text)
class HttpInput < Input
Fluent::Plugin.register_input('http', self)

# TODO: update this plugin implementation to use server plugin helper, after adding keepalive feature on it

helpers :parser, :compat_parameters, :event_loop

EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8")
Expand Down
88 changes: 52 additions & 36 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
# limitations under the License.
#

require 'cool.io'
require 'yajl'

require 'fluent/plugin/input'
require 'fluent/config/error'
require 'fluent/plugin/parser'

require 'yajl'

module Fluent::Plugin
class SyslogInput < Input
Fluent::Plugin.register_input('syslog', self)

helpers :parser, :compat_parameters, :event_loop
helpers :parser, :compat_parameters, :server

DEFAULT_PARSER = 'syslog'
SYSLOG_REGEXP = /^\<([0-9]+)\>(.*)/
Expand Down Expand Up @@ -68,11 +67,6 @@ class SyslogInput < Input
7 => 'debug'
}

def initialize
super
require 'fluent/plugin/socket_util'
end

desc 'The port to listen to.'
config_param :port, :integer, default: 5140
desc 'The bind address to listen to.'
Expand All @@ -81,14 +75,22 @@ def initialize
config_param :tag, :string
desc 'The transport protocol used to receive logs.(udp, tcp)'
config_param :protocol_type, :enum, list: [:tcp, :udp], default: :udp

desc 'If true, add source host to event record.'
config_param :include_source_host, :bool, default: false
config_param :include_source_host, :bool, default: false, deprecated: 'use "source_hostname_key" or "source_address_key" instead.'
desc 'Specify key of source host when include_source_host is true.'
config_param :source_host_key, :string, default: 'source_host'.freeze

desc 'The field name of hostname of sender.'
config_param :source_hostname_key, :string, default: nil
desc 'The field name of source address of sender.'
config_param :source_address_key, :string, default: nil

desc 'The field name of the priority.'
config_param :priority_key, :string, default: nil
desc 'The field name of the facility.'
config_param :facility_key, :string, default: nil

config_param :blocking_timeout, :time, default: 0.5
config_param :message_length_limit, :size, default: 2048

Expand All @@ -107,25 +109,57 @@ def configure(conf)
@parser = parser_create
@parser_parse_priority = @parser.respond_to?(:with_priority) && @parser.with_priority

if @include_source_host
if @source_address_key
raise Fluent::ConfigError, "specify either source_address_key or include_source_host"
end
@source_address_key = @source_host_key
end
@resolve_name = !!@source_hostname_key

@_event_loop_run_timeout = @blocking_timeout
end

def start
super

@handler = listen(method(:message_handler))
event_loop_attach(@handler)
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
case @protocol_type
when :udp then start_udp_server
when :tcp then start_tcp_server
else
raise "BUG: invalid protocol_type value:#{@protocol_type}"
end
end

def shutdown
@handler.close
def start_udp_server
server_create_udp(:in_syslog_udp_server, @port, bind: @bind, max_bytes: @message_length_limit, resolve_name: @resolve_name) do |data, sock|
message_handler(data.chomp, sock)
end
end

super
def start_tcp_server
# syslog family add "\n" to each message and this seems only way to split messages in tcp stream
delimiter = "\n"
delimiter_size = delimiter.size
server_create_connection(:in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_name) do |conn|
buffer = ""
conn.data do |data|
buffer << data
pos = 0
while idx = buffer.index(delimiter, pos)
msg = buffer[pos...idx]
pos = idx + delimiter_size
message_handler(msg, conn)
end
buffer.slice!(0, pos) if pos > 0
end
end
end

private

def message_handler(data, addr)
def message_handler(data, sock)
pri = nil
text = data
unless @parser_parse_priority
Expand All @@ -150,7 +184,8 @@ def message_handler(data, addr)

record[@priority_key] = priority if @priority_key
record[@facility_key] = facility if @facility_key
record[@source_host_key] = addr[2] if @include_source_host
record[@source_address_key] = sock.remote_addr if @source_address_key
record[@source_hostname_key] = sock.remote_host if @source_hostname_key

tag = "#{@tag}.#{facility}.#{priority}"
emit(tag, time, record)
Expand All @@ -160,25 +195,6 @@ def message_handler(data, addr)
log.error_backtrace
end

private

def listen(callback)
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
if @protocol_type == :udp
@usock = client.listen_udp(@bind, @port)
Fluent::SocketUtil::UdpHandler.new(@usock, log, @message_length_limit, callback)
else
# syslog family add "\n" to each message and this seems only way to split messages in tcp stream
lsock = client.listen_tcp(@bind, @port)
Coolio::TCPServer.new(lsock, nil, Fluent::SocketUtil::TcpHandler, log, "\n", callback)
end
end

def emit(tag, time, record)
router.emit(tag, time, record)
rescue => e
Expand Down
Loading

0 comments on commit cbcdc5a

Please sign in to comment.