diff --git a/.travis.yml b/.travis.yml index dcfa243458..85e250f485 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: ruby rvm: - - 1.9.3 - 2.0.0 - 2.1 - 2.2 diff --git a/fluentd.gemspec b/fluentd.gemspec index 10f1775d90..b0bc9928fb 100644 --- a/fluentd.gemspec +++ b/fluentd.gemspec @@ -19,7 +19,7 @@ Gem::Specification.new do |gem| gem.required_ruby_version = '>= 1.9.3' - gem.add_runtime_dependency("msgpack", [">= 0.5.11", "< 0.6.0"]) + gem.add_runtime_dependency("msgpack", [">= 0.7.0"]) gem.add_runtime_dependency("json", [">= 1.4.3"]) gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"]) gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"]) @@ -34,6 +34,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency("win32-event", ["~> 0.6.1"]) gem.add_runtime_dependency("windows-pr", ["~> 1.2.3"]) end + gem.add_runtime_dependency("strptime", [">= 0.1.3"]) gem.add_development_dependency("rake", [">= 0.9.2"]) gem.add_development_dependency("flexmock", ["~> 1.3.3"]) diff --git a/lib/fluent/buffer.rb b/lib/fluent/buffer.rb index 71d0443b2a..4b06de4bf3 100644 --- a/lib/fluent/buffer.rb +++ b/lib/fluent/buffer.rb @@ -112,7 +112,7 @@ def write_to(io) def msgpack_each(&block) open {|io| - u = MessagePack::Unpacker.new(io) + u = Fluent::Engine.msgpack_factory.unpacker(io) begin u.each(&block) rescue EOFError diff --git a/lib/fluent/command/cat.rb b/lib/fluent/command/cat.rb index db7c0ed80f..49f4aaa469 100644 --- a/lib/fluent/command/cat.rb +++ b/lib/fluent/command/cat.rb @@ -289,7 +289,7 @@ def abort_message(time, record) when 'msgpack' begin - u = MessagePack::Unpacker.new($stdin) + u = Fluent::Engine.msgpack_factory.unpacker($stdin) u.each {|record| w.write(record) } diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index c347e7a4eb..6952495478 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -30,6 +30,9 @@ def initialize @log_event_queue = [] @suppress_config_dump = false + + @msgpack_factory = MessagePack::Factory.new + @msgpack_factory.register_type(Fluent::EventTime::TYPE, Fluent::EventTime) end MATCH_CACHE_SIZE = 1024 @@ -37,6 +40,7 @@ def initialize attr_reader :root_agent attr_reader :matches, :sources + attr_reader :msgpack_factory def init(opts = {}) BasicSocket.do_not_reverse_lookup = true @@ -132,7 +136,7 @@ def flush! def now # TODO thread update - Time.now.to_i + Fluent::EventTime.now end def log_event_loop diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 252fe92f72..b1920de45a 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -27,12 +27,20 @@ def each(&block) end def to_msgpack_stream - out = MessagePack::Packer.new # MessagePack::Packer is fastest way to serialize events + out = Fluent::Engine.msgpack_factory.packer each {|time,record| out.write([time,record]) } out.to_s end + + def to_msgpack_stream_forced_integer + out = Fluent::Engine.msgpack_factory.packer + each {|time,record| + out.write([time.to_i,record]) + } + out.to_s + end end @@ -143,7 +151,7 @@ def repeatable? def each(&block) # TODO format check - unpacker = MessagePack::Unpacker.new + unpacker = Fluent::Engine.msgpack_factory.unpacker unpacker.feed_each(@data, &block) nil end diff --git a/lib/fluent/load.rb b/lib/fluent/load.rb index 1a0d064443..874f3b3b73 100644 --- a/lib/fluent/load.rb +++ b/lib/fluent/load.rb @@ -9,6 +9,7 @@ require 'yajl' require 'uri' require 'msgpack' +require 'strptime' begin require 'sigdump/setup' rescue @@ -16,6 +17,7 @@ end require 'cool.io' +require 'fluent/time' require 'fluent/env' require 'fluent/version' require 'fluent/log' diff --git a/lib/fluent/mixin.rb b/lib/fluent/mixin.rb index ff4b1680d1..a43e69c8be 100644 --- a/lib/fluent/mixin.rb +++ b/lib/fluent/mixin.rb @@ -17,6 +17,7 @@ module Fluent class TimeFormatter require 'fluent/timezone' + require 'fluent/time' def initialize(format, localtime, timezone = nil) @tc1 = 0 @@ -24,6 +25,16 @@ def initialize(format, localtime, timezone = nil) @tc2 = 0 @tc2_str = nil + if format && format =~ /(^|[^%])(%%)*%L|(^|[^%])(%%)*%\d*N/ + define_singleton_method(:format) {|time| + format_with_subsec(time) + } + else + define_singleton_method(:format) {|time| + format_without_subsec(time) + } + end + if formatter = Fluent::Timezone.formatter(timezone, format) define_singleton_method(:format_nocache) {|time| formatter.call(time) @@ -54,7 +65,7 @@ def initialize(format, localtime, timezone = nil) end end - def format(time) + def format_without_subsec(time) if @tc1 == time return @tc1_str elsif @tc2 == time @@ -72,6 +83,28 @@ def format(time) end end + def format_with_subsec(time) + if Fluent::EventTime.eq?(@tc1, time) + return @tc1_str + elsif Fluent::EventTime.eq?(@tc2, time) + return @tc2_str + else + str = format_nocache(time) + if @tc1 < @tc2 + @tc1 = time + @tc1_str = str + else + @tc2 = time + @tc2_str = str + end + return str + end + end + + def format(time) + # will be overridden in initialize + end + def format_nocache(time) # will be overridden in initialize end diff --git a/lib/fluent/output.rb b/lib/fluent/output.rb index 652d711b49..491d8dd34e 100644 --- a/lib/fluent/output.rb +++ b/lib/fluent/output.rb @@ -431,13 +431,19 @@ def flush_secondary(secondary) class ObjectBufferedOutput < BufferedOutput + config_param :time_as_integer, :bool, :default => true + def initialize super end def emit(tag, es, chain) @emit_count += 1 - data = es.to_msgpack_stream + if @time_as_integer + data = es.to_msgpack_stream_forced_integer + else + data = es.to_msgpack_stream + end key = tag if @buffer.emit(key, data, chain) submit_flush @@ -529,7 +535,7 @@ def configure(conf) else @flush_interval = [60, @time_slice_cache_interval].min @enqueue_buffer_proc = Proc.new do - nowslice = @time_slicer.call(Engine.now - @time_slice_wait) + nowslice = @time_slicer.call(Time.now - @time_slice_wait) @buffer.keys.each {|key| if key < nowslice @buffer.push(key) diff --git a/lib/fluent/parser.rb b/lib/fluent/parser.rb index 7f06bca353..bcef71a952 100644 --- a/lib/fluent/parser.rb +++ b/lib/fluent/parser.rb @@ -60,12 +60,18 @@ def initialize(time_format) @cache2_time = nil @parser = if time_format - Proc.new { |value| Time.strptime(value, time_format) } + begin + strptime = Strptime.new(time_format) + Proc.new { |value| Fluent::EventTime.from_time(strptime.exec(value)) } + rescue + Proc.new { |value| Fluent::EventTime.from_time(Time.strptime(value, time_format)) } + end else - Time.method(:parse) + Proc.new { |value| Fluent::EventTime.parse(value) } end end + # TODO: new cache mechanism using format string def parse(value) unless value.is_a?(String) raise ParserError, "value must be string: #{value}" @@ -77,7 +83,7 @@ def parse(value) return @cache2_time else begin - time = @parser.call(value).to_i + time = @parser.call(value) rescue => e raise ParserError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}" end @@ -256,7 +262,7 @@ def parse(text) time = @mutex.synchronize { @time_parser.parse(value) } else begin - time = value.to_i + time = Fluent::EventTime.from_time(Time.at(value.to_f)) rescue => e raise ParserError, "invalid time value: value = #{value}, error_class = #{e.class.name}, error = #{e.message}" end diff --git a/lib/fluent/plugin/buf_memory.rb b/lib/fluent/plugin/buf_memory.rb index 4773d1707e..f725d503d5 100644 --- a/lib/fluent/plugin/buf_memory.rb +++ b/lib/fluent/plugin/buf_memory.rb @@ -57,7 +57,7 @@ def write_to(io) # optimize def msgpack_each(&block) - u = MessagePack::Unpacker.new + u = Fluent::Engine.msgpack_factory.unpacker u.feed_each(@data, &block) end end diff --git a/lib/fluent/plugin/exec_util.rb b/lib/fluent/plugin/exec_util.rb index 35fd05853a..7fe262ecf6 100644 --- a/lib/fluent/plugin/exec_util.rb +++ b/lib/fluent/plugin/exec_util.rb @@ -58,7 +58,7 @@ def call(io) class MessagePackParser < Parser def call(io) - @u = MessagePack::Unpacker.new(io) + @u = Fluent::Engine.msgpack_factory.unpacker(io) begin @u.each(&@on_message) rescue EOFError diff --git a/lib/fluent/plugin/filter_record_transformer.rb b/lib/fluent/plugin/filter_record_transformer.rb index f61d4f3598..64375e804c 100644 --- a/lib/fluent/plugin/filter_record_transformer.rb +++ b/lib/fluent/plugin/filter_record_transformer.rb @@ -88,7 +88,7 @@ def filter_stream(tag, es) last_record = record # for debug log new_record = reform(time, record, placeholders) if @renew_time_key && new_record.has_key?(@renew_time_key) - time = new_record[@renew_time_key].to_i + time = EventTime.from_time(Time.at(new_record[@renew_time_key].to_f)) end new_es.add(time, new_record) end diff --git a/lib/fluent/plugin/in_exec.rb b/lib/fluent/plugin/in_exec.rb index fe31908dec..98d0949c02 100644 --- a/lib/fluent/plugin/in_exec.rb +++ b/lib/fluent/plugin/in_exec.rb @@ -66,9 +66,15 @@ def configure(conf) if @time_key if @time_format f = @time_format - @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } + @time_parse_proc = + begin + strptime = Strptime.new(f) + Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) } + rescue + Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) } + end else - @time_parse_proc = Proc.new {|str| str.to_i } + @time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) } end end diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 2bfcee690d..0811fff259 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -153,8 +153,8 @@ def on_message(msg, chunk_size, source) entries.each {|e| record = e[1] next if record.nil? - time = e[0].to_i - time = (now ||= Engine.now) if time == 0 + time = e[0] + time = (now ||= Engine.now) if time.to_i == 0 es.add(time, record) } router.emit_stream(tag, es) @@ -165,7 +165,7 @@ def on_message(msg, chunk_size, source) record = msg[2] return if record.nil? time = msg[1] - time = Engine.now if time == 0 + time = Engine.now if time.to_i == 0 router.emit(tag, time, record) option = msg[3] end @@ -219,7 +219,7 @@ def on_read(data) else m = method(:on_read_msgpack) @serializer = :to_msgpack.to_proc - @u = MessagePack::Unpacker.new + @u = Fluent::Engine.msgpack_factory.unpacker end (class << self; self; end).module_eval do diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb index 2285100501..17212b4f89 100644 --- a/lib/fluent/plugin/in_http.rb +++ b/lib/fluent/plugin/in_http.rb @@ -144,8 +144,8 @@ def on_request(path_info, params) end end time = if param_time = params['time'] - param_time = param_time.to_i - param_time.zero? ? Engine.now : param_time + param_time = param_time.to_f + param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time)) else record_time.nil? ? Engine.now : record_time end @@ -191,7 +191,7 @@ def on_request(path_info, params) def parse_params_default(params) record = if msgpack = params['msgpack'] - MessagePack.unpack(msgpack) + Engine.msgpack_factory.unpacker.feed(msgpack).read elsif js = params['json'] JSON.parse(js) else diff --git a/lib/fluent/plugin/in_stream.rb b/lib/fluent/plugin/in_stream.rb index ce2f72e636..f443914ac7 100644 --- a/lib/fluent/plugin/in_stream.rb +++ b/lib/fluent/plugin/in_stream.rb @@ -87,8 +87,8 @@ def on_message(msg) entries.each {|e| record = e[1] next if record.nil? - time = e[0].to_i - time = (now ||= Engine.now) if time == 0 + time = e[0] + time = (now ||= Engine.now) if time.to_i == 0 es.add(time, record) } router.emit_stream(tag, es) @@ -99,7 +99,7 @@ def on_message(msg) return if record.nil? time = msg[1] - time = Engine.now if time == 0 + time = Engine.now if time.to_i == 0 router.emit(tag, time, record) end end @@ -130,7 +130,7 @@ def on_read(data) @y.on_parse_complete = @on_message else m = method(:on_read_msgpack) - @u = MessagePack::Unpacker.new + @u = Fluent::Engine.msgpack_factory.unpacker end (class << self; self; end).module_eval do diff --git a/lib/fluent/plugin/out_exec_filter.rb b/lib/fluent/plugin/out_exec_filter.rb index 4646e4a702..4488234c5d 100644 --- a/lib/fluent/plugin/out_exec_filter.rb +++ b/lib/fluent/plugin/out_exec_filter.rb @@ -120,9 +120,15 @@ def configure(conf) if @out_time_key if f = @out_time_format - @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } + @time_parse_proc = + begin + strptime = Strptime.new(f) + Proc.new { |str| Fluent::EventTime.from_time(strptime.exec(str)) } + rescue + Proc.new {|str| Fluent::EventTime.from_time(Time.strptime(str, f)) } + end else - @time_parse_proc = Proc.new {|str| str.to_i } + @time_parse_proc = Proc.new {|str| Fluent::EventTime.from_time(Time.at(str.to_f)) } end elsif @out_time_format log.warn "out_time_format effects nothing when out_time_key is not specified: #{conf}" diff --git a/lib/fluent/plugin/out_stream.rb b/lib/fluent/plugin/out_stream.rb index f5d51bd79c..41db049c6d 100644 --- a/lib/fluent/plugin/out_stream.rb +++ b/lib/fluent/plugin/out_stream.rb @@ -65,7 +65,7 @@ def write(chunk) chain = NullOutputChain.instance chunk.open {|io| # TODO use MessagePackIoEventStream - u = MessagePack::Unpacker.new(io) + u = Fluent::Engine.msgpack_factory.unpacker(io) begin u.each {|(tag,entries)| es = MultiEventStream.new diff --git a/lib/fluent/process.rb b/lib/fluent/process.rb index 19a31c7e8c..f2b04e9058 100644 --- a/lib/fluent/process.rb +++ b/lib/fluent/process.rb @@ -176,7 +176,7 @@ def input_forward_main(ipr, pid) end def read_event_stream(r, &block) - u = MessagePack::Unpacker.new(r) + u = Fluent::Engine.msgpack_factory.unpacker(r) begin #buf = '' #map = {} diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 2368e1274c..235d7fb534 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -184,7 +184,7 @@ def handle_emits_error(tag, es, error) log.warn "send an error event stream to @ERROR:", error_info @error_collector.emit_stream(tag, es) else - now = Engine.now + now = Time.now if @suppress_emit_error_log_interval.zero? || now > @next_emit_error_log_time log.warn "emit transaction failed:", error_info log.warn_backtrace diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb index 43be9002a5..a3f9b77de0 100644 --- a/lib/fluent/test/base.rb +++ b/lib/fluent/test/base.rb @@ -21,12 +21,19 @@ def self.setup engine = Fluent.const_set(:Engine, EngineClass.new).init engine.define_singleton_method(:now=) {|n| - @now = n.to_i + @now = n } engine.define_singleton_method(:now) { @now || super() } + ::Test::Unit::Assertions.module_eval { + def assert_equal_event_time(a, b) + assert_equal(a.sec, b.sec) + assert_equal(a.nsec, b.nsec) + end + } + nil end diff --git a/lib/fluent/test/input_test.rb b/lib/fluent/test/input_test.rb index 062c4a46b0..4a4bad790a 100644 --- a/lib/fluent/test/input_test.rb +++ b/lib/fluent/test/input_test.rb @@ -140,7 +140,10 @@ def run(&block) tag, events = @emit_streams[j] events.each do |time, record| - assert_equal(@expects[i], [tag, time, record]) if @expects + if @expects + assert_equal(@expects[i], [tag, time, record]) + assert_equal_event_time(@expects[i][1], time) if @expects[i][1].is_a?(Fluent::EventTime) + end i += 1 end j += 1 diff --git a/lib/fluent/test/output_test.rb b/lib/fluent/test/output_test.rb index bca6c3e911..4ff78fd484 100644 --- a/lib/fluent/test/output_test.rb +++ b/lib/fluent/test/output_test.rb @@ -37,8 +37,8 @@ def initialize(klass, tag='test', &block) attr_accessor :tag - def emit(record, time=Time.now) - es = OneEventStream.new(time.to_i, record) + def emit(record, time=Engine.now) + es = OneEventStream.new(time, record) chain = TestOutputChain.new @instance.emit(@tag, es, chain) assert_equal 1, chain.called @@ -60,8 +60,8 @@ def @instance.buffer attr_accessor :tag - def emit(record, time=Time.now) - @entries << [time.to_i, record] + def emit(record, time=Engine.now) + @entries << [time, record] self end @@ -109,11 +109,11 @@ def initialize(klass, tag='test', &block) attr_accessor :tag - def emit(record, time=Time.now) + def emit(record, time=Engine.now) slicer = @instance.instance_eval{@time_slicer} - key = slicer.call(time.to_i) + key = slicer.call(time) @entries[key] = [] unless @entries.has_key?(key) - @entries[key] << [time.to_i, record] + @entries[key] << [time, record] self end diff --git a/lib/fluent/time.rb b/lib/fluent/time.rb new file mode 100644 index 0000000000..abf2ede2cd --- /dev/null +++ b/lib/fluent/time.rb @@ -0,0 +1,81 @@ +module Fluent + class EventTime + TYPE = 0 + + def initialize(sec, nsec = 0) + @sec = sec + @nsec = nsec + end + + def ==(other) + if other.is_a?(Fluent::EventTime) + @sec == other.sec + else + @sec == other + end + end + + def sec + @sec + end + + def nsec + @nsec + end + + def to_int + @sec + end + + # for Time.at + def to_r + Rational(@sec * 1_000_000_000 + @nsec, 1_000_000_000) + end + + # for > and others + def coerce(other) + [other, @sec] + end + + def to_s + @sec.to_s + end + + def to_msgpack(io = nil) + @sec.to_msgpack(io) + end + + def to_msgpack_ext + [@sec, @nsec].pack('NN') + end + + def self.from_msgpack_ext(data) + new(*data.unpack('NN')) + end + + def self.from_time(time) + Fluent::EventTime.new(time.to_i, time.nsec) + end + + def self.eq?(a, b) + if a.is_a?(Fluent::EventTime) && b.is_a?(Fluent::EventTime) + a.sec == b.sec && a.nsec == b.nsec + else + a == b + end + end + + def self.now + from_time(Time.now) + end + + def self.parse(*args) + from_time(Time.parse(*args)) + end + + ## TODO: For performance, implement +, -, and so on + def method_missing(name, *args, &block) + @sec.send(name, *args, &block) + end + end +end diff --git a/test/plugin/test_filter_record_transformer.rb b/test/plugin/test_filter_record_transformer.rb index f1af131f0c..66aa23dd1b 100644 --- a/test/plugin/test_filter_record_transformer.rb +++ b/test/plugin/test_filter_record_transformer.rb @@ -95,10 +95,11 @@ def emit(config, msgs = ['']) test 'renew_time_key' do config = %[renew_time_key message] times = [ Time.local(2,2,3,4,5,2010,nil,nil,nil,nil), Time.local(3,2,3,4,5,2010,nil,nil,nil,nil) ] - msgs = times.map{|t| t.to_i.to_s } + msgs = times.map{|t| t.to_f.to_s } es = emit(config, msgs) es.each_with_index do |(time, record), i| assert_equal(times[i].to_i, time) + assert(time.is_a?(Fluent::EventTime)) end end diff --git a/test/plugin/test_filter_stdout.rb b/test/plugin/test_filter_stdout.rb index 761d1083a3..32cc8b005f 100644 --- a/test/plugin/test_filter_stdout.rb +++ b/test/plugin/test_filter_stdout.rb @@ -32,7 +32,7 @@ def emit(d, msg, time) def test_through_record d = create_driver time = Time.now - filtered = emit(d, {'test' => 'test'}, time) + filtered = emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) assert_equal({'test' => 'test'}, filtered) end @@ -59,7 +59,7 @@ def test_configure_output_type def test_output_type_json d = create_driver(CONFIG + "\noutput_type json") time = Time.now - out = capture_log(d) { emit(d, {'test' => 'test'}, time) } + out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) } assert_equal "#{time.localtime} filter.test: {\"test\":\"test\"}\n", out # NOTE: Float::NAN is not jsonable @@ -71,12 +71,12 @@ def test_output_type_json def test_output_type_hash d = create_driver(CONFIG + "\noutput_type hash") time = Time.now - out = capture_log(d) { emit(d, {'test' => 'test'}, time) } + out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) } assert_equal "#{time.localtime} filter.test: {\"test\"=>\"test\"}\n", out # NOTE: Float::NAN is not jsonable, but hash string can output it. d = create_driver(CONFIG + "\noutput_type hash") - out = capture_log(d) { emit(d, {'test' => Float::NAN}, time) } + out = capture_log(d) { emit(d, {'test' => Float::NAN}, Fluent::EventTime.from_time(time)) } assert_equal "#{time.localtime} filter.test: {\"test\"=>NaN}\n", out end @@ -84,7 +84,7 @@ def test_output_type_hash def test_include_time_key d = create_driver(CONFIG + "\noutput_type json\ninclude_time_key true\nutc") time = Time.now - message_time = Time.parse("2011-01-02 13:14:15 UTC").to_i + message_time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") out = capture_log(d) { emit(d, {'test' => 'test'}, message_time) } assert_equal "#{time.localtime} filter.test: {\"test\":\"test\",\"time\":\"2011-01-02T13:14:15Z\"}\n", out end @@ -93,7 +93,7 @@ def test_include_time_key def test_format_json d = create_driver(CONFIG + "\nformat json") time = Time.now - out = capture_log(d) { emit(d, {'test' => 'test'}, time) } + out = capture_log(d) { emit(d, {'test' => 'test'}, Fluent::EventTime.from_time(time)) } assert_equal "{\"test\":\"test\"}\n", out end diff --git a/test/plugin/test_in_dummy.rb b/test/plugin/test_in_dummy.rb index 828a5a5601..d2bf159bc0 100644 --- a/test/plugin/test_in_dummy.rb +++ b/test/plugin/test_in_dummy.rb @@ -77,6 +77,7 @@ def create_driver(conf) emits.each do |tag, time, record| assert_equal("dummy", tag) assert_equal({"foo"=>"bar"}, record) + assert(time.is_a?(Fluent::EventTime)) end end diff --git a/test/plugin/test_in_exec.rb b/test/plugin/test_in_exec.rb index 5d5911d474..91677bfdd9 100644 --- a/test/plugin/test_in_exec.rb +++ b/test/plugin/test_in_exec.rb @@ -5,7 +5,7 @@ class ExecInputTest < Test::Unit::TestCase def setup Fluent::Test.setup - @test_time = Time.parse("2011-01-02 13:14:15").to_i + @test_time = Fluent::EventTime.parse("2011-01-02 13:14:15") @script = File.expand_path(File.join(File.dirname(__FILE__), '..', 'scripts', 'exec_script.rb')) end @@ -77,6 +77,7 @@ def test_emit emits = d.emits assert_equal true, emits.length > 0 assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0] + assert_equal_event_time(@test_time, emits[0][1]) end def test_emit_json @@ -89,6 +90,7 @@ def test_emit_json emits = d.emits assert_equal true, emits.length > 0 assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0] + assert_equal_event_time(@test_time, emits[0][1]) end def test_emit_msgpack @@ -101,5 +103,6 @@ def test_emit_msgpack emits = d.emits assert_equal true, emits.length > 0 assert_equal ["tag1", @test_time, {"k1"=>"ok"}], emits[0] + assert_equal_event_time(@test_time, emits[0][1]) end end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 329725f19e..10a2fed9d2 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -36,7 +36,7 @@ def connect def test_time d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") Fluent::Engine.now = time d.expect_emit "tag1", time, {"a"=>1} @@ -44,7 +44,7 @@ def test_time d.run do d.expected_emits.each {|tag,time,record| - send_data [tag, 0, record].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write([tag, 0, record]).to_s } end end @@ -52,6 +52,21 @@ def test_time def test_message d = create_driver + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + + d.expect_emit "tag1", time, {"a"=>1} + d.expect_emit "tag2", time, {"a"=>2} + + d.run do + d.expected_emits.each {|tag,time,record| + send_data Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s + } + end + end + + def test_message_with_time_as_integer + d = create_driver + time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"a"=>1} @@ -59,7 +74,7 @@ def test_message d.run do d.expected_emits.each {|tag,time,record| - send_data [tag, time, record].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s } end end @@ -67,6 +82,23 @@ def test_message def test_forward d = create_driver + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + + d.expect_emit "tag1", time, {"a"=>1} + d.expect_emit "tag1", time, {"a"=>2} + + d.run do + entries = [] + d.expected_emits.each {|tag,time,record| + entries << [time, record] + } + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s + end + end + + def test_forward_with_time_as_integer + d = create_driver + time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"a"=>1} @@ -77,13 +109,30 @@ def test_forward d.expected_emits.each {|tag,time,record| entries << [time, record] } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end def test_packed_forward d = create_driver + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + + d.expect_emit "tag1", time, {"a"=>1} + d.expect_emit "tag1", time, {"a"=>2} + + d.run do + entries = '' + d.expected_emits.each {|tag,time,record| + Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush + } + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s + end + end + + def test_packed_forward_with_time_as_integer + d = create_driver + time = Time.parse("2011-01-02 13:14:15 UTC").to_i d.expect_emit "tag1", time, {"a"=>1} @@ -92,9 +141,9 @@ def test_packed_forward d.run do entries = '' d.expected_emits.each {|tag,time,record| - [time, record].to_msgpack(entries) + Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end @@ -119,7 +168,7 @@ def test_send_large_chunk_warning chunk_size_limit 32M ]) - time = Time.parse("2014-04-25 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2014-04-25 13:14:15 UTC") # generate over 16M chunk str = "X" * 1024 * 1024 @@ -128,7 +177,7 @@ def test_send_large_chunk_warning assert chunk.size < (32 * 1024 * 1024) d.run do - MessagePack::Unpacker.new.feed_each(chunk) do |obj| + Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end @@ -150,14 +199,14 @@ def test_send_large_chunk_only_warning d = create_driver(CONFIG + %[ chunk_size_warn_limit 16M ]) - time = Time.parse("2014-04-25 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2014-04-25 13:14:15 UTC") # generate over 16M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack d.run do - MessagePack::Unpacker.new.feed_each(chunk) do |obj| + Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end @@ -175,7 +224,7 @@ def test_send_large_chunk_limit chunk_size_limit 32M ]) - time = Time.parse("2014-04-25 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2014-04-25 13:14:15 UTC") # generate over 32M chunk str = "X" * 1024 * 1024 @@ -184,7 +233,7 @@ def test_send_large_chunk_limit # d.run => send_data d.run do - MessagePack::Unpacker.new.feed_each(chunk) do |obj| + Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000") end end @@ -223,7 +272,7 @@ def test_send_broken_chunk(data) def test_respond_to_message_requiring_ack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], @@ -249,7 +298,7 @@ def test_respond_to_message_requiring_ack def test_respond_to_forward_requiring_ack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], @@ -276,7 +325,7 @@ def test_respond_to_forward_requiring_ack def test_respond_to_packed_forward_requiring_ack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], @@ -329,7 +378,7 @@ def test_respond_to_message_json_requiring_ack def test_not_respond_to_message_not_requiring_ack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], @@ -350,7 +399,7 @@ def test_not_respond_to_message_not_requiring_ack def test_not_respond_to_forward_not_requiring_ack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], @@ -373,7 +422,7 @@ def test_not_respond_to_forward_not_requiring_ack def test_not_respond_to_packed_forward_not_requiring_ack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], diff --git a/test/plugin/test_in_gc_stat.rb b/test/plugin/test_in_gc_stat.rb index 3fc96d7f8e..ff0638251f 100644 --- a/test/plugin/test_in_gc_stat.rb +++ b/test/plugin/test_in_gc_stat.rb @@ -33,5 +33,6 @@ def test_emit emits = d.emits assert(emits.length > 0) assert_equal(stat, emits[0][2]) + assert(emits[0][1].is_a?(Fluent::EventTime)) end end diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 1861f0cb63..0ecc0706d4 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -32,7 +32,7 @@ def test_configure def test_time d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) Fluent::Engine.now = time d.expect_emit "tag1", time, {"a"=>1} @@ -46,10 +46,26 @@ def test_time end end + def test_time_as_float + d = create_driver + + float_time = Time.parse("2011-01-02 13:14:15.123 UTC").to_f + time = Fluent::EventTime.from_time(Time.at(float_time)) + + d.expect_emit "tag1", time, {"a"=>1} + + d.run do + d.expected_emits.each {|tag,time,record| + res = post("/#{tag}", {"json"=>record.to_json, "time"=>float_time.to_s}) + assert_equal "200", res.code + } + end + end + def test_json d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} @@ -69,7 +85,7 @@ def test_json def test_multi_json d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) events = [{"a"=>1},{"a"=>2}] tag = "tag1" @@ -138,7 +154,7 @@ def test_multi_json_with_add_http_headers def test_json_with_add_http_headers d = create_driver(CONFIG + "add_http_headers true") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) records = [["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}]] @@ -158,7 +174,7 @@ def test_json_with_add_http_headers def test_application_json d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} @@ -174,7 +190,7 @@ def test_application_json def test_msgpack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} @@ -190,7 +206,7 @@ def test_msgpack def test_multi_msgpack d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) events = [{"a"=>1},{"a"=>2}] tag = "tag1" @@ -212,7 +228,7 @@ def test_with_regexp types field_1:integer ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) d.expect_emit "tag1", time, {"field_1" => 1, "field_2" => 'str'} d.expect_emit "tag2", time, {"field_1" => 2, "field_2" => 'str'} @@ -236,7 +252,7 @@ def test_with_csv keys foo,bar ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) d.expect_emit "tag1", time, {"foo" => "1", "bar" => 'st"r'} d.expect_emit "tag2", time, {"foo" => "2", "bar" => 'str'} @@ -254,7 +270,7 @@ def test_resonse_with_empty_img d = create_driver(CONFIG + "respond_with_empty_img true") assert_equal true, d.instance.respond_with_empty_img - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.new(Time.parse("2011-01-02 13:14:15 UTC").to_i) d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} diff --git a/test/plugin/test_in_object_space.rb b/test/plugin/test_in_object_space.rb index 57a4a0cbed..fc370e9529 100644 --- a/test/plugin/test_in_object_space.rb +++ b/test/plugin/test_in_object_space.rb @@ -32,8 +32,6 @@ def test_configure def test_emit d = create_driver - time = Time.parse("2011-01-02 13:14:15").to_i - d.expected_emits_length = 2 d.run @@ -43,6 +41,7 @@ def test_emit emits.each { |tag, time, record| assert_equal d.instance.tag, tag assert_equal d.instance.top, record.keys.size + assert(time.is_a?(Fluent::EventTime)) } end end diff --git a/test/plugin/test_in_status.rb b/test/plugin/test_in_status.rb index a3b96a2320..5ca754ff3f 100644 --- a/test/plugin/test_in_status.rb +++ b/test/plugin/test_in_status.rb @@ -34,5 +34,6 @@ def test_emit emits = d.emits assert(emits.length > 0) assert_equal({"answer" => "42"}, emits[0][2]) + assert(emits[0][1].is_a?(Fluent::EventTime)) end end diff --git a/test/plugin/test_in_stream.rb b/test/plugin/test_in_stream.rb index 0ac5a4ae4c..2501832f17 100644 --- a/test/plugin/test_in_stream.rb +++ b/test/plugin/test_in_stream.rb @@ -9,7 +9,7 @@ def setup def test_time d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") Fluent::Engine.now = time d.expect_emit "tag1", time, {"a"=>1} @@ -17,7 +17,7 @@ def test_time d.run do d.expected_emits.each {|tag,time,record| - send_data [tag, 0, record].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write([tag, 0, record]).to_s } end end @@ -25,14 +25,14 @@ def test_time def test_message d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag2", time, {"a"=>2} d.run do d.expected_emits.each {|tag,time,record| - send_data [tag, time, record].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s } end end @@ -40,7 +40,7 @@ def test_message def test_forward d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag1", time, {"a"=>2} @@ -50,14 +50,14 @@ def test_forward d.expected_emits.each {|tag,time,record| entries << [time, record] } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end def test_packed_forward d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") d.expect_emit "tag1", time, {"a"=>1} d.expect_emit "tag1", time, {"a"=>2} @@ -65,9 +65,9 @@ def test_packed_forward d.run do entries = '' d.expected_emits.each {|tag,time,record| - [time, record].to_msgpack(entries) + Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush } - send_data ["tag1", entries].to_msgpack + send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s end end diff --git a/test/plugin/test_in_syslog.rb b/test/plugin/test_in_syslog.rb index 3e1a20e106..d65c4afd32 100755 --- a/test/plugin/test_in_syslog.rb +++ b/test/plugin/test_in_syslog.rb @@ -43,8 +43,8 @@ def test_time_format d = create_driver(v) tests = [ - {'msg' => '<6>Sep 11 00:00:00 localhost logger: foo', 'expected' => Time.strptime('Sep 11 00:00:00', '%b %d %H:%M:%S').to_i}, - {'msg' => '<6>Sep 1 00:00:00 localhost logger: foo', 'expected' => Time.strptime('Sep 1 00:00:00', '%b %d %H:%M:%S').to_i}, + {'msg' => '<6>Sep 11 00:00:00 localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 11 00:00:00', '%b %d %H:%M:%S'))}, + {'msg' => '<6>Sep 1 00:00:00 localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 1 00:00:00', '%b %d %H:%M:%S'))}, ] d.run do @@ -58,7 +58,7 @@ def test_time_format emits = d.emits emits.each_index {|i| - assert_equal(tests[i]['expected'], emits[i][1]) + assert_equal_event_time(tests[i]['expected'], emits[i][1]) } } end diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index 34008b4aa2..fe2736f533 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -77,6 +77,8 @@ def test_emit assert_equal(true, emits.length > 0) assert_equal({"message" => "test3"}, emits[0][2]) assert_equal({"message" => "test4"}, emits[1][2]) + assert(emits[0][1].is_a?(Fluent::EventTime)) + assert(emits[1][1].is_a?(Fluent::EventTime)) assert_equal(1, d.emit_streams.size) end diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb index 60280ddaae..0b89da8c9e 100755 --- a/test/plugin/test_in_tcp.rb +++ b/test/plugin/test_in_tcp.rb @@ -83,6 +83,7 @@ def compare_test_result(emits, tests) assert_equal(2, emits.size) emits.each_index {|i| assert_equal(tests[i]['expected'], emits[i][2]['message']) + assert(emits[i][1].is_a?(Fluent::EventTime)) } end end diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb index e2b2166deb..1d4d46284f 100755 --- a/test/plugin/test_in_udp.rb +++ b/test/plugin/test_in_udp.rb @@ -44,8 +44,8 @@ def test_time_format d = create_driver(v) tests = [ - {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => Time.strptime('Sep 11 00:00:00', '%b %d %H:%M:%S').to_i}, - {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => Time.strptime('Sep 1 00:00:00', '%b %d %H:%M:%S').to_i}, + {'msg' => '[Sep 11 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 11 00:00:00', '%b %d %H:%M:%S'))}, + {'msg' => '[Sep 1 00:00:00] localhost logger: foo', 'expected' => Fluent::EventTime.from_time(Time.strptime('Sep 1 00:00:00', '%b %d %H:%M:%S'))}, ] d.run do @@ -59,7 +59,7 @@ def test_time_format emits = d.emits emits.each_index {|i| - assert_equal(tests[i]['expected'], emits[i][1]) + assert_equal_event_time(tests[i]['expected'], emits[i][1]) } } end @@ -99,6 +99,7 @@ def compare_test_result(emits, tests) assert_equal(2, emits.size) emits.each_index {|i| assert_equal(tests[i]['expected'], emits[i][2]['message']) + assert(emits[i][1].is_a?(Fluent::EventTime)) } end end diff --git a/test/plugin/test_out_copy.rb b/test/plugin/test_out_copy.rb index 4ff5523019..04725d198c 100644 --- a/test/plugin/test_out_copy.rb +++ b/test/plugin/test_out_copy.rb @@ -86,7 +86,7 @@ def test_msgpack_es_emit_bug es = if defined?(MessagePack::Packer) time = Time.parse("2013-05-26 06:37:22 UTC").to_i - packer = MessagePack::Packer.new + packer = Fluent::Engine.msgpack_factory.packer packer.pack([time, {"a" => 1}]) packer.pack([time, {"a" => 2}]) Fluent::MessagePackEventStream.new(packer.to_s) diff --git a/test/plugin/test_out_exec.rb b/test/plugin/test_out_exec.rb index 361eba63f8..3b8a8a25af 100644 --- a/test/plugin/test_out_exec.rb +++ b/test/plugin/test_out_exec.rb @@ -88,6 +88,28 @@ def test_format_msgpack d.run end + def test_format_time + config = %[ + keys "time,tag,k1" + tag_key "tag" + time_key "time" + time_format %Y-%m-%d %H:%M:%S.%3N + ] + d = create_driver(config) + + time = Fluent::EventTime::from_time(Time.parse("2011-01-02 13:14:15.123")) + tests = [{"k1"=>"v1","kx"=>"vx"}, {"k1"=>"v2","kx"=>"vx"}] + + tests.each { |test| + d.emit(test, time) + } + + d.expect_format %[2011-01-02 13:14:15.123\ttest\tv1\n] + d.expect_format %[2011-01-02 13:14:15.123\ttest\tv2\n] + + d.run + end + def test_write d = create_driver time, tests = create_test_case diff --git a/test/plugin/test_out_exec_filter.rb b/test/plugin/test_out_exec_filter.rb index 81b14cc5ee..00d888dd5c 100644 --- a/test/plugin/test_out_exec_filter.rb +++ b/test/plugin/test_out_exec_filter.rb @@ -69,7 +69,7 @@ def test_configure def test_emit_1 d = create_driver - time = Time.parse("2011-01-02 13:14:15").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15") d.run do d.emit({"k1"=>1}, time) @@ -79,7 +79,9 @@ def test_emit_1 emits = d.emits assert_equal 2, emits.length assert_equal ["test", time, {"k2"=>"1"}], emits[0] + assert_equal_event_time time, emits[0][1] assert_equal ["test", time, {"k2"=>"2"}], emits[1] + assert_equal_event_time time, emits[1][1] end def test_emit_2 @@ -92,7 +94,7 @@ def test_emit_2 num_children 3 ] - time = Time.parse("2011-01-02 13:14:15").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15") d.run do d.emit({"k1"=>1}, time) @@ -102,7 +104,9 @@ def test_emit_2 emits = d.emits assert_equal 2, emits.length assert_equal ["xxx", time, {"k2"=>"1"}], emits[0] + assert_equal_event_time time, emits[0][1] assert_equal ["xxx", time, {"k2"=>"2"}], emits[1] + assert_equal_event_time time, emits[1][1] end def test_emit_3 @@ -115,7 +119,7 @@ def test_emit_3 num_children 3 ] - time = Time.parse("2011-01-02 13:14:15").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15") d.run do d.emit({"val1"=>"sed-ed value foo"}, time) @@ -125,6 +129,7 @@ def test_emit_3 emits = d.emits assert_equal 1, emits.length assert_equal ["xxx", time, {"val2"=>"sed-ed value foo"}], emits[0] + assert_equal_event_time time, emits[0][1] d = create_driver %[ command sed #{sed_unbuffered_option} -l -e s/foo/bar/ @@ -135,7 +140,7 @@ def test_emit_3 num_children 3 ] - time = Time.parse("2011-01-02 13:14:15").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15") d.run do d.emit({"val1"=>"sed-ed value foo"}, time) @@ -145,7 +150,9 @@ def test_emit_3 emits = d.emits assert_equal 2, emits.length assert_equal ["xxx", time, {"val2"=>"sed-ed value bar"}], emits[0] + assert_equal_event_time time, emits[0][1] assert_equal ["xxx", time, {"val2"=>"sed-ed value poo"}], emits[1] + assert_equal_event_time time, emits[1][1] end def test_emit_4 @@ -160,7 +167,7 @@ def test_emit_4 num_children 3 ], 'input.test') - time = Time.parse("2011-01-02 13:14:15").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15") d.run do d.emit({"val1"=>"sed-ed value foo"}, time) @@ -170,7 +177,9 @@ def test_emit_4 emits = d.emits assert_equal 2, emits.length assert_equal ["output.test", time, {"val2"=>"sed-ed value bar"}], emits[0] + assert_equal_event_time time, emits[0][1] assert_equal ["output.test", time, {"val2"=>"sed-ed value poo"}], emits[1] + assert_equal_event_time time, emits[1][1] end def test_json_1 @@ -182,7 +191,7 @@ def test_json_1 tag_key tag ], 'input.test') - time = Time.parse("2011-01-02 13:14:15").to_i + time = Fluent::EventTime.parse("2011-01-02 13:14:15") d.run do d.emit({"message"=>%[{"time":#{time},"tag":"t1","k1":"v1"}]}, time+10) @@ -191,6 +200,52 @@ def test_json_1 emits = d.emits assert_equal 1, emits.length assert_equal ["t1", time, {"k1"=>"v1"}], emits[0] + assert_equal_event_time time, emits[0][1] + end + + def test_json_with_float_time + d = create_driver(%[ + command cat + in_keys message + out_format json + time_key time + tag_key tag + ], 'input.test') + + float_time = Time.parse("2011-01-02 13:14:15").to_f + time = Fluent::EventTime.from_time(Time.at(float_time)) + + d.run do + d.emit({"message"=>%[{"time":#{float_time},"tag":"t1","k1":"v1"}]}, time+10) + end + + emits = d.emits + assert_equal 1, emits.length + assert_equal ["t1", time, {"k1"=>"v1"}], emits[0] + assert_equal_event_time time, emits[0][1] + end + + def test_json_with_time_format + d = create_driver(%[ + command cat + in_keys message + out_format json + time_key time + time_format %d/%b/%Y %H:%M:%S.%N %z + tag_key tag + ], 'input.test') + + time_str = "28/Feb/2013 12:00:00.123456789 +0900" + time = Fluent::EventTime.from_time(Time.strptime(time_str, "%d/%b/%Y %H:%M:%S.%N %z")) + + d.run do + d.emit({"message"=>%[{"time":"#{time_str}","tag":"t1","k1":"v1"}]}, time+10) + end + + emits = d.emits + assert_equal 1, emits.length + assert_equal ["t1", time, {"k1"=>"v1"}], emits[0] + assert_equal_event_time time, emits[0][1] end end diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index edafd190e5..6f957797c5 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -106,6 +106,75 @@ def test_wait_response_timeout_config assert_equal 2, d.instance.ack_response_timeout end + def test_send_with_time_as_integer + target_input_driver = create_target_input_driver + + d = create_driver(CONFIG + %[flush_interval 1s]) + + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + + records = [ + {"a" => 1}, + {"a" => 2} + ] + d.register_run_post_condition do + d.instance.responses.length == 1 + end + + target_input_driver.run do + d.run do + records.each do |record| + d.emit record, time + end + end + end + + emits = target_input_driver.emits + assert_equal ['test', time, records[0]], emits[0] + assert_equal ['test', time, records[1]], emits[1] + assert(emits[0][1].is_a?(Integer)) + assert(emits[1][1].is_a?(Integer)) + + assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned + assert_empty d.instance.exceptions + end + + def test_send_without_time_as_integer + target_input_driver = create_target_input_driver + + d = create_driver(CONFIG + %[ + flush_interval 1s + time_as_integer false + ]) + + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + + records = [ + {"a" => 1}, + {"a" => 2} + ] + d.register_run_post_condition do + d.instance.responses.length == 1 + end + + target_input_driver.run do + d.run do + records.each do |record| + d.emit record, time + end + end + end + + emits = target_input_driver.emits + assert_equal ['test', time, records[0]], emits[0] + assert_equal ['test', time, records[1]], emits[1] + assert_equal_event_time(time, emits[0][1]) + assert_equal_event_time(time, emits[1][1]) + + assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned + assert_empty d.instance.exceptions + end + def test_send_to_a_node_supporting_responses target_input_driver = create_target_input_driver(true) diff --git a/test/plugin/test_out_stdout.rb b/test/plugin/test_out_stdout.rb index 9b94793f69..ca5ef0a3c0 100644 --- a/test/plugin/test_out_stdout.rb +++ b/test/plugin/test_out_stdout.rb @@ -33,7 +33,7 @@ def test_configure_output_type def test_emit_json d = create_driver(CONFIG + "\noutput_type json") time = Time.now - out = capture_log { d.emit({'test' => 'test'}, time) } + out = capture_log { d.emit({'test' => 'test'}, Fluent::EventTime.from_time(time)) } assert_equal "#{time.localtime} test: {\"test\":\"test\"}\n", out # NOTE: Float::NAN is not jsonable @@ -47,7 +47,7 @@ def test_emit_hash assert_equal "#{time.localtime} test: {\"test\"=>\"test\"}\n", out # NOTE: Float::NAN is not jsonable, but hash string can output it. - out = capture_log { d.emit({'test' => Float::NAN}, time) } + out = capture_log { d.emit({'test' => Float::NAN}, Fluent::EventTime.from_time(time)) } assert_equal "#{time.localtime} test: {\"test\"=>NaN}\n", out end diff --git a/test/plugin/test_out_stream.rb b/test/plugin/test_out_stream.rb index 499599ceef..b105411a5c 100644 --- a/test/plugin/test_out_stream.rb +++ b/test/plugin/test_out_stream.rb @@ -22,6 +22,23 @@ def test_write assert_equal(expect, result) end + def test_write_event_time + d = create_driver + + time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + d.emit({"a"=>1}, time) + d.emit({"a"=>2}, time) + + expect = ["test", + Fluent::Engine.msgpack_factory.packer.write([time,{"a"=>1}]).to_s + + Fluent::Engine.msgpack_factory.packer.write([time,{"a"=>2}]).to_s + ] + expect = Fluent::Engine.msgpack_factory.packer.write(expect).to_s + + result = d.run + assert_equal(expect, result) + end + def create_driver(klass, conf) Fluent::Test::BufferedOutputTestDriver.new(klass) do def write(chunk) diff --git a/test/test_event.rb b/test/test_event.rb index 833ba6893f..e7489c2d4d 100644 --- a/test/test_event.rb +++ b/test/test_event.rb @@ -31,7 +31,7 @@ def setup test 'to_msgpack_stream' do stream = @es.to_msgpack_stream - MessagePack::Unpacker.new.feed_each(stream) { |time, record| + Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record| assert_equal @time, time assert_equal @record, record } @@ -42,7 +42,8 @@ class ArrayEventStreamTest < ::Test::Unit::TestCase include Fluent def setup - @times = [Engine.now, Engine.now + 1] + time = Engine.now + @times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)] @records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}] @es = ArrayEventStream.new(@times.zip(@records)) end @@ -74,7 +75,7 @@ def setup test 'to_msgpack_stream' do i = 0 stream = @es.to_msgpack_stream - MessagePack::Unpacker.new.feed_each(stream) { |time, record| + Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record| assert_equal @times[i], time assert_equal @records[i], record i += 1 @@ -86,7 +87,8 @@ class MultiEventStreamTest < ::Test::Unit::TestCase include Fluent def setup - @times = [Engine.now, Engine.now + 1] + time = Engine.now + @times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)] @records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}] @es = MultiEventStream.new @times.zip(@records).each { |time, record| @@ -121,7 +123,7 @@ def setup test 'to_msgpack_stream' do i = 0 stream = @es.to_msgpack_stream - MessagePack::Unpacker.new.feed_each(stream) { |time, record| + Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record| assert_equal @times[i], time assert_equal @records[i], record i += 1 @@ -133,8 +135,9 @@ class MessagePackEventStreamTest < ::Test::Unit::TestCase include Fluent def setup - pk = MessagePack::Packer.new - @times = [Engine.now, Engine.now + 1] + pk = Fluent::Engine.msgpack_factory.packer + time = Engine.now + @times = [Fluent::EventTime.new(time.sec), Fluent::EventTime.new(time.sec + 1)] @records = [{'k' => 'v1', 'n' => 1}, {'k' => 'v2', 'n' => 2}] @times.zip(@records).each { |time, record| pk.write([time, record]) @@ -158,7 +161,7 @@ def setup test 'to_msgpack_stream' do i = 0 stream = @es.to_msgpack_stream - MessagePack::Unpacker.new.feed_each(stream) { |time, record| + Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record| assert_equal @times[i], time assert_equal @records[i], record i += 1 diff --git a/test/test_event_time.rb b/test/test_event_time.rb new file mode 100644 index 0000000000..9dee908a6d --- /dev/null +++ b/test/test_event_time.rb @@ -0,0 +1,157 @@ +require_relative 'helper' +require 'timecop' + +class EventTimeTest < Test::Unit::TestCase + setup do + @now = Time.now + Timecop.freeze(@now) + end + + teardown do + Timecop.return + end + + test '#sec' do + assert_equal(1, Fluent::EventTime.new(1, 2).sec) + end + + test '#nsec' do + assert_equal(2, Fluent::EventTime.new(1, 2).nsec) + assert_equal(0, Fluent::EventTime.new(1).nsec) + end + + test '#to_int' do + assert_equal(1, Fluent::EventTime.new(1, 2).to_int) + end + + test '#to_r' do + assert_equal(Rational(1_000_000_002, 1_000_000_000), Fluent::EventTime.new(1, 2).to_r) + end + + test '#to_s' do + time = Fluent::EventTime.new(100) + assert_equal('100', time.to_s) + assert_equal('100', "#{time}") + end + + test '.from_time' do + sec = 1000 + usec = 2 + time = Fluent::EventTime.from_time(Time.at(sec, usec)) + assert_equal(time.sec, sec) + assert_equal(time.nsec, usec * 1000) + end + + test 'now' do + assert_equal(@now.to_i, Fluent::EventTime.now.sec) + assert_equal(@now.nsec, Fluent::EventTime.now.nsec) + end + + test 'parse' do + assert_equal(Time.parse("2011-01-02 13:14:15").to_i, Fluent::EventTime.parse("2011-01-02 13:14:15").sec) + assert_equal(Time.parse("2011-01-02 13:14:15").nsec, Fluent::EventTime.parse("2011-01-02 13:14:15").nsec) + end + + test 'eq?' do + assert(Fluent::EventTime.eq?(Fluent::EventTime.new(1, 2), Fluent::EventTime.new(1, 2))) + refute(Fluent::EventTime.eq?(Fluent::EventTime.new(1, 2), Fluent::EventTime.new(1, 3))) + refute(Fluent::EventTime.eq?(Fluent::EventTime.new(1, 2), Fluent::EventTime.new(3, 2))) + refute(Fluent::EventTime.eq?(Fluent::EventTime.new(1, 2), Fluent::EventTime.new(3, 4))) + + assert(Fluent::EventTime.eq?(Fluent::EventTime.new(1, 2), 1)) + refute(Fluent::EventTime.eq?(Fluent::EventTime.new(1, 2), 2)) + + assert(Fluent::EventTime.eq?(1, Fluent::EventTime.new(1, 2))) + refute(Fluent::EventTime.eq?(2, Fluent::EventTime.new(1, 2))) + end + + test '==' do + assert(Fluent::EventTime.new(1, 2) == Fluent::EventTime.new(1, 2)) + assert(Fluent::EventTime.new(1, 2) == Fluent::EventTime.new(1, 3)) + refute(Fluent::EventTime.new(1, 2) == Fluent::EventTime.new(3, 2)) + refute(Fluent::EventTime.new(1, 2) == Fluent::EventTime.new(3, 4)) + + assert(Fluent::EventTime.new(1, 2) == 1) + refute(Fluent::EventTime.new(1, 2) == 2) + + assert(1 == Fluent::EventTime.new(1, 2)) + refute(2 == Fluent::EventTime.new(1, 2)) + end + + test '+' do + assert_equal(4, Fluent::EventTime.new(1, 2) + Fluent::EventTime.new(3, 4)) + assert_equal(6, Fluent::EventTime.new(1, 2) + 5) + assert_equal(6, 5 + Fluent::EventTime.new(1, 2)) + end + + test '-' do + assert_equal(-2, Fluent::EventTime.new(1, 2) - Fluent::EventTime.new(3, 4)) + assert_equal(-4, Fluent::EventTime.new(1, 2) - 5) + assert_equal(4, 5 - Fluent::EventTime.new(1, 2)) + end + + test '>' do + assert(Fluent::EventTime.new(2) > Fluent::EventTime.new(1)) + refute(Fluent::EventTime.new(1) > Fluent::EventTime.new(1)) + refute(Fluent::EventTime.new(1) > Fluent::EventTime.new(2)) + + assert(Fluent::EventTime.new(2) > 1) + refute(Fluent::EventTime.new(1) > 1) + refute(Fluent::EventTime.new(1) > 2) + + assert(2 > Fluent::EventTime.new(1)) + refute(1 > Fluent::EventTime.new(1)) + refute(1 > Fluent::EventTime.new(2)) + end + + test '>=' do + assert(Fluent::EventTime.new(2) >= Fluent::EventTime.new(1)) + assert(Fluent::EventTime.new(1) >= Fluent::EventTime.new(1)) + refute(Fluent::EventTime.new(1) >= Fluent::EventTime.new(2)) + + assert(Fluent::EventTime.new(2) >= 1) + assert(Fluent::EventTime.new(1) >= 1) + refute(Fluent::EventTime.new(1) >= 2) + + assert(2 >= Fluent::EventTime.new(1)) + assert(1 >= Fluent::EventTime.new(1)) + refute(1 >= Fluent::EventTime.new(2)) + end + + test '<' do + assert(Fluent::EventTime.new(1) < Fluent::EventTime.new(2)) + refute(Fluent::EventTime.new(1) < Fluent::EventTime.new(1)) + refute(Fluent::EventTime.new(2) < Fluent::EventTime.new(1)) + + assert(Fluent::EventTime.new(1) < 2) + refute(Fluent::EventTime.new(1) < 1) + refute(Fluent::EventTime.new(2) < 1) + + assert(1 < Fluent::EventTime.new(2)) + refute(1 < Fluent::EventTime.new(1)) + refute(2 < Fluent::EventTime.new(1)) + end + + test '=<' do + assert(Fluent::EventTime.new(1) <= Fluent::EventTime.new(2)) + assert(Fluent::EventTime.new(1) <= Fluent::EventTime.new(1)) + refute(Fluent::EventTime.new(2) <= Fluent::EventTime.new(1)) + + assert(Fluent::EventTime.new(1) <= 2) + assert(Fluent::EventTime.new(1) <= 1) + refute(Fluent::EventTime.new(2) <= 1) + + assert(1 <= Fluent::EventTime.new(2)) + assert(1 <= Fluent::EventTime.new(1)) + refute(2 <= Fluent::EventTime.new(1)) + end + + test 'Time.at' do + sec = 1000 + nsec = 2000 + ntime = Fluent::EventTime.new(sec, nsec) + time = Time.at(ntime) + assert_equal(sec, time.to_i) + assert_equal(nsec, time.nsec) + end +end diff --git a/test/test_formatter.rb b/test/test_formatter.rb index 38f26c6a92..e77a5636cc 100644 --- a/test/test_formatter.rb +++ b/test/test_formatter.rb @@ -563,6 +563,12 @@ def test_specific_localtime_timezone assert_equal("20140926 1400-1000", format(@fmt, true, "-10")) end end + + def test_format_with_subsec + time = Fluent::EventTime.new(@time) + formatter = Fluent::TimeFormatter.new("%Y%m%d %H%M.%N", false, nil) + assert_equal("20140927 0000.000000000", formatter.format(time)) + end end class TimeConfigTest < ::Test::Unit::TestCase diff --git a/test/test_output.rb b/test/test_output.rb index 351e6ed284..6066b5bc81 100644 --- a/test/test_output.rb +++ b/test/test_output.rb @@ -203,6 +203,26 @@ def test_secondary end end + class ObjectBufferedOutputTest < ::Test::Unit::TestCase + include FluentOutputTest + + def setup + Fluent::Test.setup + end + + CONFIG = %[] + + def create_driver(conf=CONFIG) + Fluent::Test::OutputTestDriver.new(Fluent::ObjectBufferedOutput).configure(conf, true) + end + + def test_configure + # default + d = create_driver + assert_equal true, d.instance.time_as_integer + end + end + class TimeSlicedOutputTest < ::Test::Unit::TestCase include FluentOutputTest include FlexMock::TestCase diff --git a/test/test_parser.rb b/test/test_parser.rb index ead7d095fc..9e252f0a5b 100644 --- a/test/test_parser.rb +++ b/test/test_parser.rb @@ -5,11 +5,15 @@ module ParserTest include Fluent + def setup + Fluent::Test.setup + end + def str2time(str_time, format = nil) if format - Time.strptime(str_time, format).to_i + Fluent::EventTime.from_time(Time.strptime(str_time, format)) else - Time.parse(str_time).to_i + Fluent::EventTime.parse(str_time) end end @@ -45,6 +49,8 @@ class TimeParserTest < ::Test::Unit::TestCase def test_call_with_parse parser = TextParser::TimeParser.new(nil) + assert(parser.parse('2013-09-18 12:00:00 +0900').is_a?(Fluent::EventTime)) + time = str2time('2013-09-18 12:00:00 +0900') assert_equal(time, parser.parse('2013-09-18 12:00:00 +0900')) end @@ -52,10 +58,21 @@ def test_call_with_parse def test_parse_with_strptime parser = TextParser::TimeParser.new('%d/%b/%Y:%H:%M:%S %z') + assert(parser.parse('28/Feb/2013:12:00:00 +0900').is_a?(Fluent::EventTime)) + time = str2time('28/Feb/2013:12:00:00 +0900', '%d/%b/%Y:%H:%M:%S %z') assert_equal(time, parser.parse('28/Feb/2013:12:00:00 +0900')) end + def test_parse_nsec_with_strptime + parser = TextParser::TimeParser.new('%d/%b/%Y:%H:%M:%S:%N %z') + + assert(parser.parse('28/Feb/2013:12:00:00:123456789 +0900').is_a?(Fluent::EventTime)) + + time = str2time('28/Feb/2013:12:00:00:123456789 +0900', '%d/%b/%Y:%H:%M:%S:%N %z') + assert_equal_event_time(time, parser.parse('28/Feb/2013:12:00:00:123456789 +0900')) + end + def test_parse_with_invalid_argument parser = TextParser::TimeParser.new(nil) @@ -113,7 +130,7 @@ def test_parse_with_time_key ) text = '2013-02-28 12:00:00 +0900' parser.parse(text) do |time, record| - assert_equal Time.parse(text).to_i, time + assert_equal Fluent::EventTime.parse(text), time end end @@ -390,14 +407,38 @@ def test_parse_with_invalid_time end end + def test_parse_float_time + parser = TextParser::JSONParser.new + format = "%d/%b/%Y:%H:%M:%S %z" + text = "100.1" + parser.parse("{\"time\":\"#{text}\"}") do |time, record| + assert_equal Time.at(text.to_f).to_i, time.sec + assert_equal Time.at(text.to_f).nsec, time.nsec + end + end + def test_parse_with_keep_time_key parser = TextParser::JSONParser.new + format = "%d/%b/%Y:%H:%M:%S %z" parser.configure( - 'time_format'=>"%d/%b/%Y:%H:%M:%S %z", + 'time_format'=>format, 'keep_time_key'=>'true', ) text = "28/Feb/2013:12:00:00 +0900" parser.parse("{\"time\":\"#{text}\"}") do |time, record| + assert_equal Time.strptime(text, format).to_i, time.sec + assert_equal text, record['time'] + end + end + + def test_parse_with_keep_time_key_without_time_format + parser = TextParser::JSONParser.new + parser.configure( + 'keep_time_key'=>'true', + ) + text = "100" + parser.parse("{\"time\":\"#{text}\"}") do |time, record| + assert_equal text.to_i, time.sec assert_equal text, record['time'] end end