From 53459ad4d1466487d8d945b7d2405c71e589a3b8 Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Fri, 16 Oct 2015 15:41:02 +0900 Subject: [PATCH 001/193] Add `config_set_desc` We can write description of plugin parameter as following: ``` class Fluent::SomeInput < Fluent::Input config_set_desc :tag, "This value is the tag assigned to the generated events." config_param :tag, :string end ``` --- lib/fluent/configurable.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb index 24b1fbff60..a42b9e0470 100644 --- a/lib/fluent/configurable.rb +++ b/lib/fluent/configurable.rb @@ -105,6 +105,10 @@ def config_set_default(name, defval) configure_proxy(self.name).config_set_default(name, defval) end + def config_set_desc(name, desc) + configure_proxy(self.name).config_set_desc(name, desc) + end + def config_section(name, *args, &block) configure_proxy(self.name).config_section(name, *args, &block) attr_accessor configure_proxy(self.name).sections[name].param_name From 59225ed66931b9bc2a2abfdeaad0e5aeea62a2f7 Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Fri, 16 Oct 2015 16:06:12 +0900 Subject: [PATCH 002/193] Add `desc` method We can write description of plugin parameter as following: ``` class Fluent::SomeInput < Fluent::Input desc "This value is the tag assigned to the generated events." config_param :tag, :string end ``` --- lib/fluent/config/configure_proxy.rb | 9 ++++++ lib/fluent/configurable.rb | 4 +++ test/config/test_configure_proxy.rb | 42 ++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+) diff --git a/lib/fluent/config/configure_proxy.rb b/lib/fluent/config/configure_proxy.rb index c91b87b967..1e9b1d744f 100644 --- a/lib/fluent/config/configure_proxy.rb +++ b/lib/fluent/config/configure_proxy.rb @@ -178,6 +178,11 @@ def config_argument(name, *args, &block) def config_param(name, *args, &block) name, block, opts = parameter_configuration(name, *args, &block) + if @current_description + config_set_desc(name, @current_description) + @current_description = nil + end + @sections.delete(name) @params[name] = [block, opts] name @@ -205,6 +210,10 @@ def config_set_desc(name, description) nil end + def desc(description) + @current_description = description + end + def config_section(name, *args, &block) unless block_given? raise ArgumentError, "#{self.name}: config_section requires block parameter" diff --git a/lib/fluent/configurable.rb b/lib/fluent/configurable.rb index a42b9e0470..e7f3f95284 100644 --- a/lib/fluent/configurable.rb +++ b/lib/fluent/configurable.rb @@ -114,6 +114,10 @@ def config_section(name, *args, &block) attr_accessor configure_proxy(self.name).sections[name].param_name end + def desc(description) + configure_proxy(self.name).desc(description) + end + def merged_configure_proxy configurables = ancestors.reverse.select{ |a| a.respond_to?(:configure_proxy) } diff --git a/test/config/test_configure_proxy.rb b/test/config/test_configure_proxy.rb index 18ff36911c..27b5236c2e 100644 --- a/test/config/test_configure_proxy.rb +++ b/test/config/test_configure_proxy.rb @@ -112,6 +112,26 @@ class TestConfigureProxy < ::Test::Unit::TestCase end end + sub_test_case '#desc' do + setup do + @proxy = Fluent::Config::ConfigureProxy.new(:section) + end + + test 'permit to specify description twice' do + @proxy.desc("description1") + @proxy.desc("description2") + @proxy.config_param(:name, :string) + assert_equal("description2", @proxy.descriptions[:name]) + end + + test 'does not permit description specification twice' do + @proxy.desc("description1") + assert_raise(ArgumentError) do + @proxy.config_param(:name, :string, :desc => "description2") + end + end + end + sub_test_case '#dump' do setup do @proxy = Fluent::Config::ConfigureProxy.new(:section) @@ -206,6 +226,28 @@ class TestConfigureProxy < ::Test::Unit::TestCase end assert_equal(< # desc1 + name2: string: <"name2"> # desc2 + sub2 + name3: string: <"name3"> + name4: string: <"name4"> # desc4 +CONFIG + end + + test 'sub proxy w/ desc method' do + @proxy.config_section(:sub) do + desc("desc1") + config_param(:name1, :string, default: "name1") + config_param(:name2, :string, default: "name2", desc: "desc2") + config_section(:sub2) do + config_param(:name3, :string, default: "name3") + desc("desc4") + config_param(:name4, :string, default: "name4") + end + end + assert_equal(< # desc1 name2: string: <"name2"> # desc2 From 76d5292ad643765273bba7036c4c6a107d926a20 Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Fri, 16 Oct 2015 16:08:10 +0900 Subject: [PATCH 003/193] Add descriptions to in_dummy plugin parameters --- lib/fluent/plugin/in_dummy.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/plugin/in_dummy.rb b/lib/fluent/plugin/in_dummy.rb index 5f2c0c9d17..00492ce006 100644 --- a/lib/fluent/plugin/in_dummy.rb +++ b/lib/fluent/plugin/in_dummy.rb @@ -20,9 +20,13 @@ class DummyInput < Input BIN_NUM = 10 + desc "The value is the tag assigned to the generated events." config_param :tag, :string + desc "It configures how many events to generate per second." config_param :rate, :integer, :default => 1 + desc "If specified, each generated event has an auto-incremented key field." config_param :auto_increment_key, :string, :default => nil + desc "The dummy data to be generated. An array of JSON hashes or a single JSON hash." config_param :dummy, :default => [{"message"=>"dummy"}] do |val| begin parsed = JSON.parse(val) From 99e30ec93f36d06df05b32853066c48dbbce4659 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 21 Oct 2015 11:18:49 -0600 Subject: [PATCH 004/193] Doc: update README with Slack Team info Signed-off-by: Eduardo Silva --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 39735ded80..a853d77a1a 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,7 @@ Mobile/Web Application Logging | Fluentd can function as middleware to enable as - Documentation: http://docs.fluentd.org/ - Source repository: http://github.com/fluent - Discussion: http://groups.google.com/group/fluentd +- Slack / Community: http://slack.fluentd.org - Newsletters: http://get.treasuredata.com/Fluentd_education - Author: Sadayuki Furuhashi - Copyright: (c) 2011 FURUHASHI Sadayuki @@ -48,4 +49,3 @@ Mobile/Web Application Logging | Fluentd can function as middleware to enable as Patches contributed by [great developers](https://github.com/fluent/fluentd/contributors). [](https://github.com/fluent/fluentd) - From 95e0151c1394818a8ea579966c998945541ca064 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Mon, 26 Oct 2015 16:00:50 +0900 Subject: [PATCH 005/193] Don't use popular name to prevent field overwrite when enable_ruby is true --- lib/fluent/plugin/filter_record_transformer.rb | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/fluent/plugin/filter_record_transformer.rb b/lib/fluent/plugin/filter_record_transformer.rb index f61d4f3598..8129587ba9 100644 --- a/lib/fluent/plugin/filter_record_transformer.rb +++ b/lib/fluent/plugin/filter_record_transformer.rb @@ -230,18 +230,17 @@ def prepare_placeholders(time, record, opts) @placeholders = struct end - def expand(str, force_stringify=false) + def expand(_str_for_eval_, force_stringify=false) if @auto_typecast and !force_stringify - single_placeholder_matched = str.match(/\A\${([^}]+)}\z/) - if single_placeholder_matched - code = single_placeholder_matched[1] - return eval code, @placeholders.instance_eval { binding } + _single_placeholder_matched_ = _str_for_eval_.match(/\A\${([^}]+)}\z/) + if _single_placeholder_matched_ + return eval _single_placeholder_matched_[1], @placeholders.instance_eval { binding } end end - interpolated = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..} - eval "\"#{interpolated}\"", @placeholders.instance_eval { binding } + _interpolated_for_eval_ = _str_for_eval_.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..} + eval "\"#{_interpolated_for_eval_}\"", @placeholders.instance_eval { binding } rescue => e - log.warn "failed to expand `#{str}`", :error_class => e.class, :error => e.message + log.warn "failed to expand `#{_str_for_eval_}`", :error_class => e.class, :error => e.message log.warn_backtrace nil end From e90135a0fe47a0cf7b9cc867899be095dd9de6d9 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 4 Sep 2015 21:31:21 +0900 Subject: [PATCH 006/193] in_monitor_agent emits plugins info to use exisiting plugins. fix #667 --- lib/fluent/plugin/in_monitor_agent.rb | 64 ++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_monitor_agent.rb b/lib/fluent/plugin/in_monitor_agent.rb index 6f609a12d3..18b3cd4d82 100644 --- a/lib/fluent/plugin/in_monitor_agent.rb +++ b/lib/fluent/plugin/in_monitor_agent.rb @@ -27,6 +27,8 @@ def initialize config_param :bind, :string, :default => '0.0.0.0' config_param :port, :integer, :default => 24220 + config_param :tag, :string, :default => nil + config_param :emit_interval, :time, :default => 60 class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet def initialize(server, agent) @@ -199,6 +201,36 @@ def process(req, res) end end + class TimerWatcher < Coolio::TimerWatcher + def initialize(interval, log, &callback) + @callback = callback + @log = log + + # Avoid long shutdown time + @num_call = 0 + if interval >= 10 + min_interval = 10 + @call_interval = interval / 10 + else + min_interval = interval + @call_interval = 0 + end + + super(min_interval, true) + end + + def on_timer + @num_call += 1 + if @num_call >= @call_interval + @num_call = 0 + @callback.call + end + rescue => e + @log.error e.to_s + @log.error_backtrace + end + end + def start log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins" @srv = WEBrick::HTTPServer.new({ @@ -214,6 +246,29 @@ def start @thread = Thread.new { @srv.start } + if @tag + log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'" + + @loop = Coolio::Loop.new + opts = {:with_config => false} + timer = TimerWatcher.new(@emit_interval, log) { + es = MultiEventStream.new + now = Engine.now + plugins_info_all(opts).each { |record| + es.add(now, record) + } + router.emit_stream(@tag, es) + } + @loop.attach(timer) + @thread_for_emit = Thread.new(&method(:run)) + end + end + + def run + @loop.run + rescue => e + log.error "unexpected error", :error => e.to_s + log.error_backtrace end def shutdown @@ -225,6 +280,13 @@ def shutdown @thread.join @thread = nil end + if @tag + @loop.watchers.each { |w| w.detach } + @loop.stop + @loop = nil + @thread_for_emit.join + @thread_for_emit = nil + end end MONITOR_INFO = { @@ -320,7 +382,7 @@ def get_monitor_info(pe, opts={}) obj['plugin_id'] = pe.plugin_id obj['plugin_category'] = plugin_category(pe) obj['type'] = pe.config['@type'] || pe.config['type'] - obj['config'] = pe.config + obj['config'] = pe.config if !opts.has_key?(:with_config) || opts[:with_config] # run MONITOR_INFO in plugins' instance context and store the info to obj MONITOR_INFO.each_pair {|key,code| From 64b613f02dcb07c278211d6697af7526b195e1e7 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 27 Oct 2015 18:41:42 +0900 Subject: [PATCH 007/193] Log shutdown plugin info to check shutdown sequence --- lib/fluent/agent.rb | 2 ++ lib/fluent/root_agent.rb | 1 + 2 files changed, 3 insertions(+) diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index b2cc379ad1..5080f2377c 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -78,6 +78,7 @@ def shutdown @started_filters.map { |f| Thread.new do begin + log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id f.shutdown rescue => e log.warn "unexpected error while shutting down filter plugins", :plugin => f.class, :plugin_id => f.plugin_id, :error_class => e.class, :error => e @@ -91,6 +92,7 @@ def shutdown @started_outputs.map { |o| Thread.new do begin + log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id o.shutdown rescue => e log.warn "unexpected error while shutting down output plugins", :plugin => o.class, :plugin_id => o.plugin_id, :error_class => e.class, :error => e diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 2368e1274c..a0851c78b7 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -118,6 +118,7 @@ def shutdown @started_inputs.map { |i| Thread.new do begin + log.info "shutting down input", type: Plugin.lookup_name_from_class(i.class), plugin_id: i.plugin_id i.shutdown rescue => e log.warn "unexpected error while shutting down input plugin", :plugin => i.class, :plugin_id => i.plugin_id, :error_class => e.class, :error => e From fce745f992461dde8ea37b7a26989dd8d35f8a28 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 28 Oct 2015 17:26:22 +0900 Subject: [PATCH 008/193] Tempfile should be binary mode --- lib/fluent/plugin/out_exec.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/fluent/plugin/out_exec.rb b/lib/fluent/plugin/out_exec.rb index 28adca6adf..baf7f6e276 100644 --- a/lib/fluent/plugin/out_exec.rb +++ b/lib/fluent/plugin/out_exec.rb @@ -81,6 +81,7 @@ def write(chunk) prog = "#{@command} #{chunk.path}" else tmpfile = Tempfile.new("fluent-plugin-exec-") + tmpfile.binmode chunk.write_to(tmpfile) tmpfile.close prog = "#{@command} #{tmpfile.path}" From 988dbe867def28f38b629ab8305347fa7f8194d5 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 29 Oct 2015 16:19:18 +0900 Subject: [PATCH 009/193] Forgot to add v0.12 test in travis-ci --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index dcfa243458..9752cc3e68 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,7 @@ branches: only: - master - v0.10 + - v0.12 - v0.14 gemfile: From aac0b72f00b8804f4a35f4a4659d0350ca861814 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 29 Oct 2015 16:13:34 +0900 Subject: [PATCH 010/193] Add Engine#msgpack_factory for v0.14 compatibility --- lib/fluent/buffer.rb | 2 +- lib/fluent/command/cat.rb | 2 +- lib/fluent/engine.rb | 13 +++++++++++++ lib/fluent/event.rb | 4 ++-- lib/fluent/plugin/buf_memory.rb | 2 +- lib/fluent/plugin/exec_util.rb | 2 +- lib/fluent/plugin/in_forward.rb | 2 +- lib/fluent/plugin/in_http.rb | 2 +- lib/fluent/plugin/in_stream.rb | 2 +- lib/fluent/plugin/out_stream.rb | 2 +- lib/fluent/process.rb | 2 +- test/plugin/test_in_forward.rb | 14 +++++++------- test/plugin/test_in_stream.rb | 10 +++++----- test/plugin/test_out_copy.rb | 2 +- 14 files changed, 37 insertions(+), 24 deletions(-) diff --git a/lib/fluent/buffer.rb b/lib/fluent/buffer.rb index 71d0443b2a..4b06de4bf3 100644 --- a/lib/fluent/buffer.rb +++ b/lib/fluent/buffer.rb @@ -112,7 +112,7 @@ def write_to(io) def msgpack_each(&block) open {|io| - u = MessagePack::Unpacker.new(io) + u = Fluent::Engine.msgpack_factory.unpacker(io) begin u.each(&block) rescue EOFError diff --git a/lib/fluent/command/cat.rb b/lib/fluent/command/cat.rb index db7c0ed80f..49f4aaa469 100644 --- a/lib/fluent/command/cat.rb +++ b/lib/fluent/command/cat.rb @@ -289,7 +289,7 @@ def abort_message(time, record) when 'msgpack' begin - u = MessagePack::Unpacker.new($stdin) + u = Fluent::Engine.msgpack_factory.unpacker($stdin) u.each {|record| w.write(record) } diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index c347e7a4eb..49e3be2229 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -19,6 +19,16 @@ module Fluent require 'fluent/root_agent' class EngineClass + class DummyMessagePackFactory + def packer(io = nil) + MessagePack::Packer.new(io) + end + + def unpacker(io = nil) + MessagePack::Unpacker.new(io) + end + end + def initialize @root_agent = nil @event_router = nil @@ -30,6 +40,8 @@ def initialize @log_event_queue = [] @suppress_config_dump = false + + @msgpack_factory = DummyMessagePackFactory.new end MATCH_CACHE_SIZE = 1024 @@ -37,6 +49,7 @@ def initialize attr_reader :root_agent attr_reader :matches, :sources + attr_reader :msgpack_factory def init(opts = {}) BasicSocket.do_not_reverse_lookup = true diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 252fe92f72..f07bfd1a1e 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -27,7 +27,7 @@ def each(&block) end def to_msgpack_stream - out = MessagePack::Packer.new # MessagePack::Packer is fastest way to serialize events + out = Fluent::Engine.msgpack_factory.packer each {|time,record| out.write([time,record]) } @@ -143,7 +143,7 @@ def repeatable? def each(&block) # TODO format check - unpacker = MessagePack::Unpacker.new + unpacker = Fluent::Engine.msgpack_factory.unpacker unpacker.feed_each(@data, &block) nil end diff --git a/lib/fluent/plugin/buf_memory.rb b/lib/fluent/plugin/buf_memory.rb index 4773d1707e..f725d503d5 100644 --- a/lib/fluent/plugin/buf_memory.rb +++ b/lib/fluent/plugin/buf_memory.rb @@ -57,7 +57,7 @@ def write_to(io) # optimize def msgpack_each(&block) - u = MessagePack::Unpacker.new + u = Fluent::Engine.msgpack_factory.unpacker u.feed_each(@data, &block) end end diff --git a/lib/fluent/plugin/exec_util.rb b/lib/fluent/plugin/exec_util.rb index 35fd05853a..7fe262ecf6 100644 --- a/lib/fluent/plugin/exec_util.rb +++ b/lib/fluent/plugin/exec_util.rb @@ -58,7 +58,7 @@ def call(io) class MessagePackParser < Parser def call(io) - @u = MessagePack::Unpacker.new(io) + @u = Fluent::Engine.msgpack_factory.unpacker(io) begin @u.each(&@on_message) rescue EOFError diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 2bfcee690d..0a937cbd61 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -219,7 +219,7 @@ def on_read(data) else m = method(:on_read_msgpack) @serializer = :to_msgpack.to_proc - @u = MessagePack::Unpacker.new + @u = Fluent::Engine.msgpack_factory.unpacker end (class << self; self; end).module_eval do diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 2285100501..c91c16b47a 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -191,7 +191,7 @@ def on_request(path_info, params) def parse_params_default(params) record = if msgpack = params['msgpack'] - MessagePack.unpack(msgpack) + Engine.msgpack_factory.unpacker.feed(msgpack).read elsif js = params['json'] JSON.parse(js) else diff --git a/lib/fluent/plugin/in_stream.rb b/lib/fluent/plugin/in_stream.rb index ce2f72e636..e254531138 100644 --- a/lib/fluent/plugin/in_stream.rb +++ b/lib/fluent/plugin/in_stream.rb @@ -130,7 +130,7 @@ def on_read(data) @y.on_parse_complete = @on_message else m = method(:on_read_msgpack) - @u = MessagePack::Unpacker.new + @u = Fluent::Engine.msgpack_factory.unpacker end (class << self; self; end).module_eval do diff --git a/lib/fluent/plugin/out_stream.rb b/lib/fluent/plugin/out_stream.rb index f5d51bd79c..41db049c6d 100644 --- a/lib/fluent/plugin/out_stream.rb +++ b/lib/fluent/plugin/out_stream.rb @@ -65,7 +65,7 @@ def write(chunk) chain = NullOutputChain.instance chunk.open {|io| # TODO use MessagePackIoEventStream - u = MessagePack::Unpacker.new(io) + u = Fluent::Engine.msgpack_factory.unpacker(io) begin u.each {|(tag,entries)| es = MultiEventStream.new diff --git a/lib/fluent/process.rb b/lib/fluent/process.rb index 19a31c7e8c..f2b04e9058 100644 --- a/lib/fluent/process.rb +++ b/lib/fluent/process.rb @@ -176,7 +176,7 @@ def input_forward_main(ipr, pid) end def read_event_stream(r, &block) - u = MessagePack::Unpacker.new(r) + u = Fluent::Engine.msgpack_factory.unpacker(r) begin #buf = '' #map = {} diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 329725f19e..a012ba7214 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -59,7 +59,7 @@ def test_message d.run do d.expected_emits.each {|tag,time,record| - send_data [tag, time, record].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s } end end @@ -77,7 +77,7 @@ def test_forward d.expected_emits.each {|tag,time,record| entries << [time, record] } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end @@ -92,9 +92,9 @@ def test_packed_forward d.run do entries = '' d.expected_emits.each {|tag,time,record| - [time, record].to_msgpack(entries) + Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end @@ -128,7 +128,7 @@ def test_send_large_chunk_warning assert chunk.size < (32 * 1024 * 1024) d.run do - MessagePack::Unpacker.new.feed_each(chunk) do |obj| + Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end @@ -157,7 +157,7 @@ def test_send_large_chunk_only_warning chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack d.run do - MessagePack::Unpacker.new.feed_each(chunk) do |obj| + Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end @@ -184,7 +184,7 @@ def test_send_large_chunk_limit # d.run => send_data d.run do - MessagePack::Unpacker.new.feed_each(chunk) do |obj| + Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end diff --git a/test/plugin/test_in_stream.rb b/test/plugin/test_in_stream.rb index 0c89c806a7..5eb13c5b8a 100644 --- a/test/plugin/test_in_stream.rb +++ b/test/plugin/test_in_stream.rb @@ -17,7 +17,7 @@ def test_time d.run do d.expected_emits.each {|tag,time,record| - send_data [tag, 0, record].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write([tag, 0, record]).to_s } end end @@ -32,7 +32,7 @@ def test_message d.run do d.expected_emits.each {|tag,time,record| - send_data [tag, time, record].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s } end end @@ -50,7 +50,7 @@ def test_forward d.expected_emits.each {|tag,time,record| entries << [time, record] } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end @@ -65,9 +65,9 @@ def test_packed_forward d.run do entries = '' d.expected_emits.each {|tag,time,record| - [time, record].to_msgpack(entries) + Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb index 4ff5523019..04725d198c 100644 --- a/test/plugin/test_out_copy.rb +++ b/test/plugin/test_out_copy.rb @@ -86,7 +86,7 @@ def test_msgpack_es_emit_bug es = if defined?(MessagePack::Packer) time = Time.parse("2013-05-26 06:37:22 UTC").to_i - packer = MessagePack::Packer.new + packer = Fluent::Engine.msgpack_factory.packer packer.pack([time, {"a" => 1}]) packer.pack([time, {"a" => 2}]) Fluent::MessagePackEventStream.new(packer.to_s) From c3185fc6e07aaab52a64ba1937287860f17fe540 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 30 Oct 2015 15:25:52 +0900 Subject: [PATCH 011/193] Use *args to forward all method arguments --- lib/fluent/engine.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 49e3be2229..046a2d98af 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -20,12 +20,12 @@ module Fluent class EngineClass class DummyMessagePackFactory - def packer(io = nil) - MessagePack::Packer.new(io) + def packer(*args) + MessagePack::Packer.new(*args) end - def unpacker(io = nil) - MessagePack::Unpacker.new(io) + def unpacker(*args) + MessagePack::Unpacker.new(*args) end end From d6711233dd90f176760cf6adfa7d2202d3da0a4d Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 4 Nov 2015 20:02:59 +0900 Subject: [PATCH 012/193] Update test-unit gem to 3.1.4 --- fluentd.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluentd.gemspec b/fluentd.gemspec index 3f3f08800d..dc9f6a3db2 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -35,6 +35,6 @@ Gem::Specification.new do |gem| gem.add_development_dependency("simplecov", ["~> 0.6.4"]) gem.add_development_dependency("rr", [">= 1.0.0"]) gem.add_development_dependency("timecop", [">= 0.3.0"]) - gem.add_development_dependency("test-unit", ["~> 3.0.2"]) + gem.add_development_dependency("test-unit", ["~> 3.1.4"]) gem.add_development_dependency("test-unit-rr", ["~> 1.0.3"]) end From f03ee90254be76befd471ac1f4749012a5b7e941 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 4 Nov 2015 23:02:50 +0900 Subject: [PATCH 013/193] v0.12.17 --- ChangeLog | 21 +++++++++++++++++++++ lib/fluent/version.rb | 2 +- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/ChangeLog b/ChangeLog index 7737f8588a..8f37e68cff 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,26 @@ # v0.12 +## Release 0.12.17 - 2015/11/04 + +### New features / Enhancement + +* Engine: Add Engine#msgpack_factory for v0.14 compatibility + https://github.com/fluent/fluentd/pull/693 +* Log shutdown plugin info to check shutdown sequence + https://github.com/fluent/fluentd/pull/689 +* in_monitor_agent: Emit plugin info to use existing plugins + https://github.com/fluent/fluentd/pull/670 +* config: Improve describing plugin parameters + https://github.com/fluent/fluentd/pull/683 + +### Bug fixes + +* Tempfile should be binary mode + https://github.com/fluent/fluentd/pull/691 +* filter_record_transformer: Don't use popular name to prevent field overwrite when enable_ruby is true + https://github.com/fluent/fluentd/pull/687 + + ## Release 0.12.16 - 2015/09/30 ### New features / Enhancement diff --git a/lib/fluent/version.rb b/lib/fluent/version.rb index 77dcd1534a..0a399e8918 100644 --- a/lib/fluent/version.rb +++ b/lib/fluent/version.rb @@ -16,6 +16,6 @@ module Fluent - VERSION = '0.12.16' + VERSION = '0.12.17' end From 694af8baa967e91f1f7992f47ef4bb7d136b95e6 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Fri, 13 Nov 2015 19:50:02 +0900 Subject: [PATCH 014/193] Merge pull request #701 from cosmo0920/add-descriptions-out_file Add descriptions to out_file plugin parameters --- lib/fluent/plugin/out_file.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 37b5718ecd..d5ba98ee14 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -23,9 +23,13 @@ class FileOutput < TimeSlicedOutput 'gzip' => :gz, } + desc "The Path of the file." config_param :path, :string + desc "The format of the file content. The default is out_file." config_param :format, :string, :default => 'out_file' + desc "The flushed chunk is appended to existence file or not." config_param :append, :bool, :default => false + desc "Compress flushed file." config_param :compress, :default => nil do |val| c = SUPPORTED_COMPRESS[val] unless c @@ -33,6 +37,7 @@ class FileOutput < TimeSlicedOutput end c end + desc "Create symlink to temporary buffered file when buffer_type is file." config_param :symlink_path, :string, :default => nil def initialize From a9131dbcda08781f4b3d686ec2ca7bc50882d30f Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Tue, 24 Nov 2015 15:09:18 +0900 Subject: [PATCH 015/193] Merge pull request #707 from fluent/forward-reject-empty out_forward raises ZeroDivisionError when no is available --- lib/fluent/plugin/out_forward.rb | 4 ++++ test/plugin/test_out_forward.rb | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index b5e3976684..58a97d5599 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -129,6 +129,10 @@ def configure(conf) end log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id } + + if @nodes.empty? + raise ConfigError, "forward output plugin requires at least one is required" + end end def start diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index c3a4dc2076..f2db81895e 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -78,6 +78,12 @@ def test_configure_dns_round_robin assert_equal true, d.instance.nodes.first.conf.dns_round_robin end + def test_configure_no_server + assert_raise(Fluent::ConfigError, 'forward output plugin requires at least one is required') do + create_driver('') + end + end + def test_phi_failure_detector d = create_driver(CONFIG + %[phi_failure_detector false \n phi_threshold 0]) node = d.instance.nodes.first From 677278417bceb588480d10c96e328c000c54ea72 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 26 Nov 2015 01:17:22 +0900 Subject: [PATCH 016/193] Merge pull request #708 from fluent/support-process-name-configuration Add process_name parameter in system to change fluentd's process name --- lib/fluent/supervisor.rb | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index 66588f8274..fcc915484e 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -111,6 +111,7 @@ def initialize(opt) @chgroup = opt[:chgroup] @chuser = opt[:chuser] @rpc_server = nil + @process_name = nil @log_level = opt[:log_level] @suppress_interval = opt[:suppress_interval] @@ -306,6 +307,7 @@ def stop_rpc_server def supervise(&block) start_time = Time.now + Process.setproctitle("supervisor:#{@process_name}") if @process_name $log.info "starting fluentd-#{Fluent::VERSION}" @main_pid = fork do main_process(&block) @@ -332,6 +334,8 @@ def supervise(&block) end def main_process(&block) + Process.setproctitle("worker:#{@process_name}") if @process_name + begin block.call @@ -462,6 +466,7 @@ class SystemConfig config_param :without_source, :bool, :default => nil config_param :rpc_endpoint, :string, :default => nil config_param :enable_get_dump, :bool, :default => nil + config_param :process_name, :default => nil def initialize(conf) super() @@ -478,6 +483,7 @@ def apply(supervisor) @without_source = system.without_source unless system.without_source.nil? @rpc_endpoint = system.rpc_endpoint unless system.rpc_endpoint.nil? @enable_get_dump = system.enable_get_dump unless system.enable_get_dump.nil? + @process_name = system.process_name unless system.process_name.nil? } end end From 4e7e8019a542972e6de50a7137dfe27848f7ddf2 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 26 Nov 2015 01:17:50 +0900 Subject: [PATCH 017/193] Merge pull request #702 from cosmo0920/add-parser-test-driver --- lib/fluent/test.rb | 1 + lib/fluent/test/parser_test.rb | 66 ++++++++++++++++++++++++++++++++++ test/test_parser.rb | 62 ++++++++++++++++++++++---------- 3 files changed, 110 insertions(+), 19 deletions(-) create mode 100644 lib/fluent/test/parser_test.rb diff --git a/lib/fluent/test.rb b/lib/fluent/test.rb index 150230e1c1..a6edd9395a 100644 --- a/lib/fluent/test.rb +++ b/lib/fluent/test.rb @@ -20,5 +20,6 @@ require 'fluent/test/input_test' require 'fluent/test/output_test' require 'fluent/test/filter_test' +require 'fluent/test/parser_test' $log ||= Fluent::Log.new(Fluent::Test::DummyLogDevice.new) diff --git a/lib/fluent/test/parser_test.rb b/lib/fluent/test/parser_test.rb new file mode 100644 index 0000000000..949ab833fa --- /dev/null +++ b/lib/fluent/test/parser_test.rb @@ -0,0 +1,66 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module Test + class ParserTestDriver + def initialize(klass_or_str, format=nil, conf={}, &block) + if klass_or_str.is_a?(Class) + if block + # Create new class for test w/ overwritten methods + # klass.dup is worse because its ancestors does NOT include original class name + klass_or_str = Class.new(klass_or_str) + klass_or_str.module_eval(&block) + end + case klass_or_str.instance_method(:initialize).arity + when 0 + @instance = klass_or_str.new + when -2 + # for RegexpParser + @instance = klass_or_str.new(format, conf) + end + elsif klass_or_str.is_a?(String) + @instance = TextParser::TEMPLATE_REGISTRY.lookup(klass_or_str).call + else + @instance = klass_or_str + end + @config = Config.new + end + + attr_reader :instance, :config + + def configure(conf) + case conf + when Fluent::Config::Element + @config = conf + when String + io = StringIO.new(conf) + @config = Config::Parser.parse(io, 'fluent.conf') + when Hash + @config = Config::Element.new('ROOT', '', conf, []) + else + raise "Unknown type... #{conf}" + end + @instance.configure(@config) + self + end + + def parse(text, &block) + @instance.parse(text, &block) + end + end + end +end diff --git a/test/test_parser.rb b/test/test_parser.rb index ead7d095fc..b18181456d 100644 --- a/test/test_parser.rb +++ b/test/test_parser.rb @@ -39,6 +39,31 @@ def test_call end end + class BaseParserTestWithTestDriver < ::Test::Unit::TestCase + include ParserTest + + def create_driver(conf={}) + Fluent::Test::ParserTestDriver.new(Fluent::Parser).configure(conf) + end + + def test_init + d = create_driver + assert_true d.instance.estimate_current_event + end + + def test_configure_against_string_literal + d = create_driver('keep_time_key true') + assert_true d.instance.keep_time_key + end + + def test_parse + d = create_driver + assert_raise NotImplementedError do + d.parse('') + end + end + end + class TimeParserTest < ::Test::Unit::TestCase include ParserTest @@ -94,19 +119,19 @@ def test_parse_with_typed def test_parse_with_configure # Specify conf by configure method instaed of intializer regexp = Regexp.new(%q!^(?[^ ]*) [^ ]* (?[^ ]*) \[(?