Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate exec plugins to v0.14 api #1297

Merged
merged 41 commits into from
Nov 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1a99a98
fix to raise error for nonsense #commit_write call
tagomoris Nov 1, 2016
323f7e1
add extract support and fix time handling
tagomoris Oct 26, 2016
c7a197c
fix to wait child processes to run actually, and fix timeout feature …
tagomoris Oct 26, 2016
ac5c6f9
show plugin helper name explicitly
tagomoris Oct 26, 2016
d985778
add keep_tag_key/keep_time_key support and warning for compatibility …
tagomoris Oct 26, 2016
0195f60
Add new API methods to show characteristic of data and how to feed data.
tagomoris Oct 26, 2016
ddf20a5
Fix in/out_exec plugins to use standard Formatter/Parser plugins, and…
tagomoris Oct 26, 2016
fcc4144
migrating out_exec_filter to v0.14 APIs, without child_process plugin…
tagomoris Oct 26, 2016
a515906
fix to use convert_values correctly
tagomoris Oct 26, 2016
ab3d12c
shrink lines of code
tagomoris Nov 1, 2016
960072d
Add options "wait_timeout" and "on_exit_callback"
tagomoris Nov 1, 2016
0cf6b5d
fix code to use child_process plugin helper and async commit for bett…
tagomoris Nov 1, 2016
329c15e
fix code to use child_process pluginn helper
tagomoris Nov 1, 2016
9b5d11b
fix typo
tagomoris Nov 1, 2016
469d503
remove code for temp test
tagomoris Nov 1, 2016
58c8408
add test cases using v0.14 style configurations
tagomoris Nov 1, 2016
79ec097
fix to rescue unexpected errors
tagomoris Nov 2, 2016
a32e7ce
add test cases for command unexpectedly dies
tagomoris Nov 2, 2016
3157952
Fix child_process plugin helper to handle exit status of processes co…
tagomoris Nov 2, 2016
969e325
wait process exit at the end of #shutdown: after shutdown, all plugin…
tagomoris Nov 2, 2016
241b370
fix bug not to specify thread title and typo
tagomoris Nov 2, 2016
30cdfdc
fix bug - closing pipes should be done by child_process plugin helper
tagomoris Nov 2, 2016
98412be
fix logging - not to show exitting process in shutdown sequence
tagomoris Nov 2, 2016
6a8b3fa
add tests for v0.14 style configurations and features of num_children…
tagomoris Nov 2, 2016
ac37c03
add example configuration
tagomoris Nov 2, 2016
ae8f042
Revert "fix to raise error for nonsense #commit_write call"
tagomoris Nov 4, 2016
04a339c
add error message pattern for ruby 2.1
tagomoris Nov 4, 2016
8a5faa3
add timezone to timestamps converted into unixtime
tagomoris Nov 4, 2016
768fb81
MONOTONIC_RAW is better, but only for Linux
tagomoris Nov 4, 2016
39b9060
fix bug to never break loop
tagomoris Nov 4, 2016
4686902
make sleep time longer because shorter one fails sometimes on Travis
tagomoris Nov 4, 2016
8c552d7
remove useless variable assignment
tagomoris Nov 8, 2016
dd773b0
fix to pass error object into error stream
tagomoris Nov 8, 2016
2ecf4c8
fix to use constant not to create new string object instances
tagomoris Nov 8, 2016
cf1b2e8
fix to use plugin name
tagomoris Nov 8, 2016
21b9a32
fix description, method signature and indentation
tagomoris Nov 8, 2016
34b81e6
make test stable
tagomoris Nov 8, 2016
a9f5300
fix to check loosely not to fail on CI env
tagomoris Nov 8, 2016
9d8b6e7
take care about tests on Windows
tagomoris Nov 8, 2016
ef79463
make tests more stable
tagomoris Nov 8, 2016
8b19791
fix to use constant not to assign new objects everytime
tagomoris Nov 10, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions example/out_exec_filter.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<source>
@type dummy
@label @exec
tag exec_input
rate 10
auto_increment_key num
dummy {"data":"mydata"}
</source>

<label @exec>
<match exec_input>
@type exec_filter
@label @stdout
tag result
command ruby -e 'STDOUT.sync = true; proc = ->(){line = STDIN.readline.chomp; puts line + "\t" + Process.pid.to_s}; 1000.times{ proc.call }'
num_children 3
child_respawn -1
<inject>
time_key time
time_type float
</inject>
<format>
@type tsv
keys data, num, time
</format>
<parse>
@type tsv
keys data, num, time, pid
</parse>
<extract>
time_key time
time_type float
</extract>
</match>
</label>

<label @stdout>
<match result>
@type stdout
</match>
</label>

5 changes: 5 additions & 0 deletions lib/fluent/plugin/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class Formatter < Base

configured_in :format

PARSER_TYPES = [:text_per_line, :text, :binary]
def formatter_type
:text_per_line
end

def format(tag, time, record)
raise NotImplementedError, "Implement this method in child class"
end
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/formatter_msgpack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ module Plugin
class MessagePackFormatter < Formatter
Plugin.register_formatter('msgpack', self)

def formatter_type
:binary
end

def format(tag, time, record)
record.to_msgpack
end
Expand Down
34 changes: 34 additions & 0 deletions lib/fluent/plugin/formatter_tsv.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/formatter'

module Fluent
module Plugin
class TSVFormatter < Formatter
Plugin.register_formatter('tsv', self)

desc 'Field names included in each lines'
config_param :keys, :array, value_type: :string
desc 'The delimiter character (or string) of TSV values'
config_param :delimiter, :string, default: "\t"

def format(tag, time, record)
@keys.map{|k| record[k].to_s }.join(@delimiter)
end
end
end
end
141 changes: 48 additions & 93 deletions lib/fluent/plugin/in_exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,136 +14,91 @@
# limitations under the License.
#

require 'strptime'
require 'yajl'

require 'fluent/plugin/input'
require 'fluent/time'
require 'fluent/timezone'
require 'fluent/config/error'
require 'yajl'

module Fluent::Plugin
class ExecInput < Fluent::Plugin::Input
Fluent::Plugin.register_input('exec', self)

helpers :child_process

def initialize
super
require 'fluent/plugin/exec_util'
end
helpers :compat_parameters, :extract, :parser, :child_process

desc 'The command (program) to execute.'
config_param :command, :string
desc 'The format used to map the program output to the incoming event.(tsv,json,msgpack)'
config_param :format, :string, default: 'tsv'
desc 'Specify the comma-separated keys when using the tsv format.'
config_param :keys, default: [] do |val|
val.split(',')

config_section :parse do
config_set_default :@type, 'tsv'
config_set_default :time_type, :float
config_set_default :time_key, nil
config_set_default :estimate_current_event, false
end

config_section :extract do
config_set_default :time_type, :float
end

desc 'Tag of the output events.'
config_param :tag, :string, default: nil
desc 'The key to use as the event tag instead of the value in the event record. '
config_param :tag_key, :string, default: nil
desc 'The key to use as the event time instead of the value in the event record.'
config_param :time_key, :string, default: nil
desc 'The format of the event time used for the time_key parameter.'
config_param :time_format, :string, default: nil
desc 'The interval time between periodic program runs.'
config_param :run_interval, :time, default: nil
desc 'The default block size to read if parser requires partial read.'
config_param :read_block_size, :size, default: 10240 # 10k

def configure(conf)
super

if conf['localtime']
@localtime = true
elsif conf['utc']
@localtime = false
end

if conf['timezone']
@timezone = conf['timezone']
Fluent::Timezone.validate!(@timezone)
end

if !@tag && !@tag_key
raise Fleunt::ConfigError, "'tag' or 'tag_key' option is required on exec input"
end
attr_reader :parser

if @time_key
if @time_format
f = @time_format
@time_parse_proc =
begin
strptime = Strptime.new(f)
Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) }
rescue
Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) }
end
else
@time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) }
def configure(conf)
compat_parameters_convert(conf, :extract, :parser)
['parse', 'extract'].each do |subsection_name|
if subsection = conf.elements(subsection_name).first
if subsection.has_key?('time_format')
subsection['time_type'] ||= 'string'
end
end
end

@parser = setup_parser(conf)
end
super

def setup_parser(conf)
case @format
when 'tsv'
if @keys.empty?
raise Fluent::ConfigError, "keys option is required on exec input for tsv format"
end
Fluent::ExecUtil::TSVParser.new(@keys, method(:on_message))
when 'json'
Fluent::ExecUtil::JSONParser.new(method(:on_message))
when 'msgpack'
Fluent::ExecUtil::MessagePackParser.new(method(:on_message))
else
Fluent::ExecUtil::TextParserWrapperParser.new(conf, method(:on_message))
if !@tag && (!@extract_config || !@extract_config.tag_key)
raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input"
end
@parser = parser_create
end

def start
super

if @run_interval
child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read]) do |io|
run(io)
end
child_process_execute(:exec_input, @command, interval: @run_interval, mode: [:read], &method(:run))
else
child_process_execute(:exec_input, @command, immediate: true, mode: [:read]) do |io|
run(io)
end
child_process_execute(:exec_input, @command, immediate: true, mode: [:read], &method(:run))
end
end

def run(io)
@parser.call(io)
end

private

def on_message(record, parsed_time = nil)
if val = record.delete(@tag_key)
tag = val
else
tag = @tag
end

if parsed_time
time = parsed_time
else
if val = record.delete(@time_key)
time = @time_parse_proc.call(val)
else
time = Fluent::EventTime.now
case
when @parser.implement?(:parse_io)
@parser.parse_io(io, &method(:on_record))
when @parser.implement?(:parse_partial_data)
until io.eof?
@parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
end
when @parser.parser_type == :text_per_line
io.each_line do |line|
@parser.parse(line.chomp, &method(:on_record))
end
else
@parser.parse(io.read, &method(:on_record))
end
end

def on_record(time, record)
tag = extract_tag_from_record(record)
tag ||= @tag
time ||= extract_time_from_record(record) || Fluent::EventTime.now
router.emit(tag, time, record)
rescue => e
log.error "exec failed to emit", error: e, tag: tag, record: Yajl.dump(record)
log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e
router.emit_error_event(tag, time, record, e) if tag && time && record
end
end
end
Loading