diff --git a/lib/fluent/plugin/out_file.rb b/lib/fluent/plugin/out_file.rb index 9cae15cffe..19ce806255 100644 --- a/lib/fluent/plugin/out_file.rb +++ b/lib/fluent/plugin/out_file.rb @@ -16,42 +16,59 @@ require 'fileutils' require 'zlib' +require 'time' -require 'fluent/output' +require 'fluent/plugin/output' require 'fluent/config/error' -require 'fluent/system_config' +# TODO remove ... +require 'fluent/plugin/file_util' -module Fluent - class FileOutput < TimeSlicedOutput - include SystemConfig::Mixin +module Fluent::Plugin + class FileOutput < Output + Fluent::Plugin.register_output('file', self) - Plugin.register_output('file', self) + helpers :formatter, :inject, :compat_parameters - SUPPORTED_COMPRESS = { - 'gz' => :gz, - 'gzip' => :gz, + SUPPORTED_COMPRESS = [:text, :gz, :gzip] + SUPPORTED_COMPRESS_MAP = { + text: nil, + gz: :gzip, + gzip: :gzip, } FILE_PERMISSION = 0644 DIR_PERMISSION = 0755 + DEFAULT_TIMEKEY = 60 * 60 * 24 + desc "The Path of the file." config_param :path, :string - desc "The format of the file content. The default is out_file." - config_param :format, :string, default: 'out_file', skip_accessor: true + + desc "Specify to add file suffix for bare file path or not." + config_param :add_path_suffix, :bool, default: true + desc "The file suffix added to bare file path." + config_param :path_suffix, :string, default: '.log' desc "The flushed chunk is appended to existence file or not." config_param :append, :bool, default: false desc "Compress flushed file." - config_param :compress, default: nil do |val| - c = SUPPORTED_COMPRESS[val] - unless c - raise ConfigError, "Unsupported compression algorithm '#{val}'" - end - c - end + config_param :compress, :enum, list: SUPPORTED_COMPRESS, default: :text + desc "Execute compression again even when buffer chunk is already compressed." + config_param :recompress, :bool, default: false desc "Create symlink to temporary buffered file when buffer_type is file." config_param :symlink_path, :string, default: nil + config_section :format, init: true do + config_set_default :@type, 'out_file' + end + + config_section :buffer do + config_set_default :@type, 'file' + config_set_default :chunk_keys, ['time'] + config_set_default :timekey, DEFAULT_TIMEKEY + end + + attr_accessor :last_written_path # for tests + module SymlinkBufferMixin def symlink_path=(path) @_symlink_path = path @@ -63,48 +80,56 @@ def generate_chunk(metadata) # timekey will be appended into that file chunk. On the other side, resumed file chunks might NOT # have timekey, especially in the cases that resumed file chunks are generated by Fluentd v0.12. # These chunks will be enqueued immediately, and will be flushed soon. - latest_chunk = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last - if chunk.metadata == latest_chunk + latest_metadata = metadata_list.select{|m| m.timekey }.sort_by(&:timekey).last + if chunk.metadata == latest_metadata FileUtils.ln_sf(chunk.path, @_symlink_path) end chunk end end - def initialize - require 'zlib' - require 'time' - require 'fluent/plugin/file_util' - super - end - def configure(conf) - if path = conf['path'] - @path = path + compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time") + + configured_time_slice_format = conf['time_slice_format'] + + # v0.14 file buffer handles path as directory if '*' is missing + # 'dummy_path' is not to raise configuration error for 'path' in file buffer plugin, + # but raise it in this plugin. + if conf.elements(name: 'buffer').empty? + conf.add_element('buffer', 'time') end - unless @path - raise ConfigError, "'path' parameter is required on file output" + buffer_conf = conf.elements(name: 'buffer').first + unless buffer_conf.has_key?('path') + buffer_conf['path'] = conf['path'] || '/tmp/dummy_path' end - if pos = @path.index('*') - @path_prefix = @path[0,pos] - @path_suffix = @path[pos+1..-1] - conf['buffer_path'] ||= "#{@path}" - else - @path_prefix = @path+"." - @path_suffix = ".log" - conf['buffer_path'] ||= "#{@path}.*" + super + + @compress_method = SUPPORTED_COMPRESS_MAP[@compress] + + if @path.include?('*') && !@buffer_config.timekey + raise Fluent::ConfigError, "path including '*' must be used with buffer chunk key 'time'" end - test_path = generate_path(Time.now.strftime(@time_slice_format)) + path_suffix = @add_path_suffix ? @path_suffix : '' + @path_template = generate_path_template(@path, @buffer_config.timekey, @append, @compress_method, path_suffix: path_suffix, time_slice_format: configured_time_slice_format) + + placeholder_validate!(:path, @path_template) + + max_tag_index = get_placeholders_tag(@path_template).max || 1 + max_tag_index = 1 if max_tag_index < 1 + dummy_tag = (['a'] * max_tag_index).join('.') + dummy_record_keys = get_placeholders_keys(@path_template) || ['message'] + dummy_record = Hash[dummy_record_keys.zip(['data'] * dummy_record_keys.size)] + + test_meta1 = metadata_for_test(dummy_tag, Fluent::Engine.now, dummy_record) + test_path = extract_placeholders(@path_template, test_meta1) unless ::Fluent::FileUtil.writable_p?(test_path) - raise ConfigError, "out_file: `#{test_path}` is not writable" + raise Fluent::ConfigError, "out_file: `#{test_path}` is not writable" end - super - - @formatter = Plugin.new_formatter(@format) - @formatter.configure(conf) + @formatter = formatter_create(conf: conf.elements('format').first) if @symlink_path && @buffer.respond_to?(:path) @buffer.extend SymlinkBufferMixin @@ -116,56 +141,101 @@ def configure(conf) end def format(tag, time, record) - @formatter.format(tag, time, record) + r = inject_values_to_record(tag, time, record) + @formatter.format(tag, time, r) end def write(chunk) - path = generate_path(chunk.key) + path = extract_placeholders(@path_template, chunk.metadata) FileUtils.mkdir_p File.dirname(path), mode: @dir_perm - case @compress + unless @append + path = find_filepath_available(path) + end + + case @compress_method when nil - File.open(path, "ab", @file_perm) {|f| + File.open(path, "ab", @file_perm) do |f| chunk.write_to(f) - } - when :gz - File.open(path, "ab", @file_perm) {|f| - gz = Zlib::GzipWriter.new(f) - chunk.write_to(gz) - gz.close - } + end + when :gzip + if @buffer.compress != :gzip || @recompress + File.open(path, "ab", @file_perm) do |f| + gz = Zlib::GzipWriter.new(f) + chunk.write_to(gz, compressed: :text) + gz.close + end + else + File.open(path, "ab", @file_perm) do |f| + chunk.write_to(f, compressed: :gzip) + end + end + else + raise "BUG: unknown compression method #{@compress_method}" end - return path # for test + @last_written_path = path end - def secondary_init(primary) - # don't warn even if primary.class is not FileOutput + def timekey_to_timeformat(timekey) + case timekey + when nil then '' + when 0...60 then '%Y%m%d%H%M%S' # 60 exclusive + when 60...3600 then '%Y%m%d%H%M' + when 3600...86400 then '%Y%m%d%H' + else '%Y%m%d' + end end - private - - def suffix - case @compress - when nil - '' - when :gz - ".gz" + def compression_suffix(compress) + case compress + when :gzip then '.gz' + when nil then '' + else + raise ArgumentError, "unknown compression type #{compress}" end end - def generate_path(time_string) - if @append - "#{@path_prefix}#{time_string}#{@path_suffix}#{suffix}" + # /path/to/dir/file.* -> /path/to/dir/file.%Y%m%d + # /path/to/dir/file.*.data -> /path/to/dir/file.%Y%m%d.data + # /path/to/dir/file -> /path/to/dir/file.%Y%m%d.log + # %Y%m%d -> %Y%m%d_** (non append) + # + .gz (gzipped) + ## TODO: remove time_slice_format when end of support of compat_parameters + def generate_path_template(original, timekey, append, compress, path_suffix: '', time_slice_format: nil) + comp_suffix = compression_suffix(compress) + index_placeholder = append ? '' : '_**' + if original.index('*') + raise "BUG: configuration error must be raised for path including '*' without timekey" unless timekey + time_placeholders_part = time_slice_format || timekey_to_timeformat(timekey) + original.gsub('*', time_placeholders_part + index_placeholder) + comp_suffix else - path = nil - i = 0 - begin - path = "#{@path_prefix}#{time_string}_#{i}#{@path_suffix}#{suffix}" - i += 1 - end while File.exist?(path) - path + if timekey + if time_slice_format + "#{original}.#{time_slice_format}#{index_placeholder}#{path_suffix}#{comp_suffix}" + else + time_placeholders = timekey_to_timeformat(timekey) + if time_placeholders.scan(/../).any?{|ph| original.include?(ph) } + raise Fluent::ConfigError, "insufficient timestamp placeholders in path" if time_placeholders.scan(/../).any?{|ph| !original.include?(ph) } + "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}" + else + "#{original}.#{time_placeholders}#{index_placeholder}#{path_suffix}#{comp_suffix}" + end + end + else + "#{original}#{index_placeholder}#{path_suffix}#{comp_suffix}" + end + end + end + + def find_filepath_available(path_with_placeholder) # for non-append + raise "BUG: index placeholder not found in path: #{path_with_placeholder}" unless path_with_placeholder.index('_**') + i = 0 + while path = path_with_placeholder.sub('_**', "_#{i}") + break unless File.exist?(path) + i += 1 end + path end end end diff --git a/test/plugin/test_out_file.rb b/test/plugin/test_out_file.rb index 500f827cf5..67f7d25487 100644 --- a/test/plugin/test_out_file.rb +++ b/test/plugin/test_out_file.rb @@ -1,8 +1,10 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/output' require 'fluent/plugin/out_file' require 'fileutils' require 'time' +require 'timecop' +require 'zlib' class FileOutputTest < Test::Unit::TestCase def setup @@ -21,92 +23,299 @@ def setup ] def create_driver(conf = CONFIG) - Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::FileOutput).configure(conf) + Fluent::Test::Driver::Output.new(Fluent::Plugin::FileOutput).configure(conf) end - def test_configure - d = create_driver %[ - path test_path - compress gz - ] - assert_equal 'test_path', d.instance.path - assert_equal :gz, d.instance.compress - end + sub_test_case 'configuration' do + test 'basic configuration' do + d = create_driver %[ + path test_path + compress gz + ] + assert_equal 'test_path', d.instance.path + assert_equal :gz, d.instance.compress + assert_equal :gzip, d.instance.instance_eval{ @compress_method } + end + + test 'path should be writable' do + assert_nothing_raised do + create_driver %[path #{TMP_DIR}/test_path] + end - def test_path_writable - assert_nothing_raised do - create_driver %[path #{TMP_DIR}/test_path] + assert_nothing_raised do + FileUtils.mkdir_p("#{TMP_DIR}/test_dir") + File.chmod(0777, "#{TMP_DIR}/test_dir") + create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + end + + assert_raise(Fluent::ConfigError) do + FileUtils.mkdir_p("#{TMP_DIR}/test_dir") + File.chmod(0555, "#{TMP_DIR}/test_dir") + create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + end end - assert_nothing_raised do - FileUtils.mkdir_p("#{TMP_DIR}/test_dir") - File.chmod(0777, "#{TMP_DIR}/test_dir") - create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + test 'default timezone is localtime' do + d = create_driver(%[path #{TMP_DIR}/out_file_test]) + time = event_time("2011-01-02 13:14:15 UTC") + + with_timezone(Fluent.windows? ? 'NST-8' : 'Asia/Taipei') do + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + end + assert_equal 1, d.formatted.size + assert_equal %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}\n], d.formatted[0] end - assert_raise(Fluent::ConfigError) do - FileUtils.mkdir_p("#{TMP_DIR}/test_dir") - File.chmod(0555, "#{TMP_DIR}/test_dir") - create_driver %[path #{TMP_DIR}/test_dir/foo/bar/baz] + test 'no configuration error raised for basic configuration using "*" (v0.12 style)' do + conf = config_element('match', '**', { + 'path' => "#{TMP_DIR}/test_out.*.log", + 'time_slice_format' => '%Y%m%d', + }) + assert_nothing_raised do + create_driver(conf) + end end - end - def test_default_localtime - d = create_driver(%[path #{TMP_DIR}/out_file_test]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + test 'configuration error raised if specified directory via template is not writable' do + conf = config_element('match', '**', { + 'path' => "#{TMP_DIR}/prohibited/${tag}/file.%Y%m%d.log", + }, [ config_element('buffer', 'time,tag', {'timekey' => 86400}) ]) + FileUtils.mkdir_p("#{TMP_DIR}/prohibited") + File.chmod(0555, "#{TMP_DIR}/prohibited") + assert_raise Fluent::ConfigError.new("out_file: `#{TMP_DIR}/prohibited/a/file.20161004.log_**.log` is not writable") do + create_driver(conf) + end + end - with_timezone(Fluent.windows? ? 'NST-8' : 'Asia/Taipei') do - d.emit({"a"=>1}, time) - d.expect_format %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}\n] - d.run + test 'configuration using inject/format/buffer sections fully' do + conf = config_element('match', '**', { + 'path' => "#{TMP_DIR}/${tag}/${type}/conf_test.%Y%m%d.%H%M.log", + 'add_path_suffix' => 'false', + 'append' => "true", + 'symlink_path' => "#{TMP_DIR}/conf_test.current.log", + 'compress' => 'gzip', + 'recompress' => 'true', + }, [ + config_element('inject', '', { + 'hostname_key' => 'hostname', + 'hostname' => 'testing.local', + 'tag_key' => 'tag', + 'time_key' => 'time', + 'time_type' => 'string', + 'time_format' => '%Y/%m/%d %H:%M:%S %z', + 'timezone' => '+0900', + }), + config_element('format', '', { + '@type' => 'out_file', + 'include_tag' => 'true', + 'include_time' => 'true', + 'delimiter' => 'COMMA', + 'time_type' => 'string', + 'time_format' => '%Y-%m-%d %H:%M:%S %z', + 'utc' => 'true', + }), + config_element('buffer', 'time,tag,type', { + '@type' => 'file', + 'timekey' => '15m', + 'timekey_wait' => '5s', + 'timekey_zone' => '+0000', + 'path' => "#{TMP_DIR}/buf_conf_test", + 'chunk_limit_size' => '50m', + 'total_limit_size' => '1g', + 'compress' => 'gzip', + }), + ]) + assert_nothing_raised do + create_driver(conf) + end end end - def test_format - d = create_driver + sub_test_case 'fully configured output' do + setup do + Timecop.freeze(Time.parse("2016-10-03 23:58:00 UTC")) + conf = config_element('match', '**', { + 'path' => "#{TMP_DIR}/${tag}/${type}/full.%Y%m%d.%H%M.log", + 'add_path_suffix' => 'false', + 'append' => "true", + 'symlink_path' => "#{TMP_DIR}/full.current.log", + 'compress' => 'gzip', + 'recompress' => 'true', + }, [ + config_element('inject', '', { + 'hostname_key' => 'hostname', + 'hostname' => 'testing.local', + 'tag_key' => 'tag', + 'time_key' => 'time', + 'time_type' => 'string', + 'time_format' => '%Y/%m/%d %H:%M:%S %z', + 'timezone' => '+0900', + }), + config_element('format', '', { + '@type' => 'out_file', + 'include_tag' => 'true', + 'include_time' => 'true', + 'delimiter' => 'COMMA', + 'time_type' => 'string', + 'time_format' => '%Y-%m-%d %H:%M:%S %z', + 'utc' => 'true', + }), + config_element('buffer', 'time,tag,type', { + '@type' => 'file', + 'timekey' => '15m', + 'timekey_wait' => '5s', + 'timekey_zone' => '+0000', + 'path' => "#{TMP_DIR}/buf_full", + 'chunk_limit_size' => '50m', + 'total_limit_size' => '1g', + 'compress' => 'gzip', + }), + ]) + @d = create_driver(conf) + end - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + teardown do + FileUtils.rm_rf("#{TMP_DIR}/buf_full") + FileUtils.rm_rf("#{TMP_DIR}/my.data") + FileUtils.rm_rf("#{TMP_DIR}/your.data") + FileUtils.rm_rf("#{TMP_DIR}/full.current.log") + Timecop.return + end - d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] - d.expect_format %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] + test 'can format/write data correctly' do + d = @d - d.run - end + assert_equal 50*1024*1024, d.instance.buffer.chunk_limit_size + assert_equal 1*1024*1024*1024, d.instance.buffer.total_limit_size - def test_timezone_1 - d = create_driver %[ - path #{TMP_DIR}/out_file_test - timezone Asia/Taipei - ] + assert !(File.symlink?("#{TMP_DIR}/full.current.log")) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + t1 = event_time("2016-10-03 23:58:09 UTC") + t2 = event_time("2016-10-03 23:59:33 UTC") + t3 = event_time("2016-10-03 23:59:57 UTC") + t4 = event_time("2016-10-04 00:00:17 UTC") + t5 = event_time("2016-10-04 00:01:59 UTC") - d.emit({"a"=>1}, time) - d.expect_format %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}\n] - d.run - end + Timecop.freeze(Time.parse("2016-10-03 23:58:30 UTC")) - def test_timezone_2 - d = create_driver %[ - path #{TMP_DIR}/out_file_test - timezone -03:30 - ] + d.run(start: true, flush: false, shutdown: false) do + d.feed('my.data', t1, {"type" => "a", "message" => "data raw content"}) + d.feed('my.data', t2, {"type" => "a", "message" => "data raw content"}) + d.feed('your.data', t3, {"type" => "a", "message" => "data raw content"}) + end + + assert_equal 3, d.formatted.size + + assert Dir.exist?("#{TMP_DIR}/buf_full") + assert !(Dir.exist?("#{TMP_DIR}/my.data/a")) + assert !(Dir.exist?("#{TMP_DIR}/your.data/a")) + buffer_files = Dir.entries("#{TMP_DIR}/buf_full").reject{|e| e =~ /^\.+$/ } + assert_equal 2, buffer_files.select{|n| n.end_with?('.meta') }.size + assert_equal 2, buffer_files.select{|n| !n.end_with?('.meta') }.size + + m1 = d.instance.metadata('my.data', t1, {"type" => "a"}) + m2 = d.instance.metadata('your.data', t3, {"type" => "a"}) + + assert_equal 2, d.instance.buffer.stage.size + b1_path = d.instance.buffer.stage[m1].path + b1_size = File.lstat(b1_path).size + + assert File.symlink?("#{TMP_DIR}/full.current.log") + assert_equal d.instance.buffer.stage[m2].path, File.readlink("#{TMP_DIR}/full.current.log") + + Timecop.freeze(Time.parse("2016-10-04 00:00:06 UTC")) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + d.run(start: false, flush: true, shutdown: true) do + d.feed('my.data', t4, {"type" => "a", "message" => "data raw content"}) + d.feed('your.data', t5, {"type" => "a", "message" => "data raw content"}) + end - d.emit({"a"=>1}, time) - d.expect_format %[2011-01-02T09:44:15-03:30\ttest\t{"a":1}\n] - d.run + assert Dir.exist?("#{TMP_DIR}/buf_full") + assert Dir.exist?("#{TMP_DIR}/my.data/a") + assert Dir.exist?("#{TMP_DIR}/your.data/a") + + buffer_files = Dir.entries("#{TMP_DIR}/buf_full").reject{|e| e =~ /^\.+$/ } + assert_equal 0, buffer_files.size + + assert File.exist?("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz") + assert File.exist?("#{TMP_DIR}/my.data/a/full.20161004.0000.log.gz") + assert File.exist?("#{TMP_DIR}/your.data/a/full.20161003.2345.log.gz") + assert File.exist?("#{TMP_DIR}/your.data/a/full.20161004.0000.log.gz") + + assert{ File.lstat("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz").size < b1_size } # recompress + + assert_equal 5, d.formatted.size + + r1 = %!2016-10-03 23:58:09 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 08:58:09 +0900"}\n! + r2 = %!2016-10-03 23:59:33 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 08:59:33 +0900"}\n! + r3 = %!2016-10-03 23:59:57 +0000,your.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"your.data","time":"2016/10/04 08:59:57 +0900"}\n! + r4 = %!2016-10-04 00:00:17 +0000,my.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"my.data","time":"2016/10/04 09:00:17 +0900"}\n! + r5 = %!2016-10-04 00:01:59 +0000,your.data,{"type":"a","message":"data raw content","hostname":"testing.local","tag":"your.data","time":"2016/10/04 09:01:59 +0900"}\n! + assert_equal r1, d.formatted[0] + assert_equal r2, d.formatted[1] + assert_equal r3, d.formatted[2] + assert_equal r4, d.formatted[3] + assert_equal r5, d.formatted[4] + + read_gunzip = ->(path){ File.open(path){|fio| Zlib::GzipReader.open(fio){|io| io.read } } } + assert_equal r1 + r2, read_gunzip.call("#{TMP_DIR}/my.data/a/full.20161003.2345.log.gz") + assert_equal r3, read_gunzip.call("#{TMP_DIR}/your.data/a/full.20161003.2345.log.gz") + assert_equal r4, read_gunzip.call("#{TMP_DIR}/my.data/a/full.20161004.0000.log.gz") + assert_equal r5, read_gunzip.call("#{TMP_DIR}/your.data/a/full.20161004.0000.log.gz") + end end - def test_timezone_invalid - assert_raise(Fluent::ConfigError) do - create_driver %[ + sub_test_case 'format' do + test 'timezone UTC specified' do + d = create_driver + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end + assert_equal 2, d.formatted.size + assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n], d.formatted[0] + assert_equal %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n], d.formatted[1] + end + + test 'time formatted with specified timezone, using area name' do + d = create_driver %[ path #{TMP_DIR}/out_file_test - timezone Invalid/Invalid + timezone Asia/Taipei ] + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + assert_equal 1, d.formatted.size + assert_equal %[2011-01-02T21:14:15+08:00\ttest\t{"a":1}\n], d.formatted[0] + end + + test 'time formatted with specified timezone, using offset' do + d = create_driver %[ + path #{TMP_DIR}/out_file_test + timezone -03:30 + ] + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + assert_equal 1, d.formatted.size + assert_equal %[2011-01-02T09:44:15-03:30\ttest\t{"a":1}\n], d.formatted[0] + end + + test 'configuration error raised for invalid timezone' do + assert_raise(Fluent::ConfigError) do + create_driver %[ + path #{TMP_DIR}/out_file_test + timezone Invalid/Invalid + ] + end end end @@ -128,22 +337,24 @@ def check_gzipped_result(path, expect) assert_equal expect, result end - def test_write - d = create_driver + sub_test_case 'write' do + test 'basic case' do + d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + assert_false File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") - # FileOutput#write returns path - paths = d.run - expect_paths = ["#{TMP_DIR}/out_file_test.20110102_0.log.gz"] - assert_equal expect_paths, paths + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - check_gzipped_result(paths[0], %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) + assert File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") + check_gzipped_result("#{TMP_DIR}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) + end end - class TestWithSystem < self + sub_test_case 'file/directory permissions' do TMP_DIR_WITH_SYSTEM = File.expand_path(File.dirname(__FILE__) + "/../tmp/out_file_system#{ENV['TEST_ENV_NUMBER']}") # 0750 interprets as "488". "488".to_i(8) # => 4. So, it makes wrong permission. Umm.... OVERRIDE_DIR_PERMISSION = 750 @@ -158,7 +369,7 @@ class TestWithSystem < self ] - def setup + setup do omit "NTFS doesn't support UNIX like permissions" if Fluent.windows? FileUtils.rm_rf(TMP_DIR_WITH_SYSTEM) end @@ -168,97 +379,104 @@ def parse_system(text) Fluent::Config.parse(text, '(test)', basepath, true).elements.find { |e| e.name == 'system' } end - def test_write_with_system + test 'write to file with permission specifications' do system_conf = parse_system(CONFIG_WITH_SYSTEM) sc = Fluent::SystemConfig.new(system_conf) Fluent::Engine.init(sc) d = create_driver CONFIG_WITH_SYSTEM - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + assert_false File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz") + + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # FileOutput#write returns path - paths = d.run - expect_paths = ["#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz"] - assert_equal expect_paths, paths + assert File.exist?("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz") - check_gzipped_result(paths[0], %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) + check_gzipped_result("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz", %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n]) dir_mode = "%o" % File::stat(TMP_DIR_WITH_SYSTEM).mode assert_equal(OVERRIDE_DIR_PERMISSION, dir_mode[-3, 3].to_i) - file_mode = "%o" % File::stat(paths[0]).mode + file_mode = "%o" % File::stat("#{TMP_DIR_WITH_SYSTEM}/out_file_test.20110102_0.log.gz").mode assert_equal(OVERRIDE_FILE_PERMISSION, file_mode[-3, 3].to_i) end end - def test_write_with_format_json - d = create_driver [CONFIG, 'format json', 'include_time_key true', 'time_as_epoch'].join("\n") + sub_test_case 'format specified' do + test 'json' do + d = create_driver [CONFIG, 'format json', 'include_time_key true', 'time_as_epoch'].join("\n") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # FileOutput#write returns path - paths = d.run - check_gzipped_result(paths[0], %[#{Yajl.dump({"a" => 1, 'time' => time})}\n] + %[#{Yajl.dump({"a" => 2, 'time' => time})}\n]) - end + path = d.instance.last_written_path + check_gzipped_result(path, %[#{Yajl.dump({"a" => 1, 'time' => time.to_i})}\n] + %[#{Yajl.dump({"a" => 2, 'time' => time.to_i})}\n]) + end - def test_write_with_format_ltsv - d = create_driver [CONFIG, 'format ltsv', 'include_time_key true'].join("\n") + test 'ltsv' do + d = create_driver [CONFIG, 'format ltsv', 'include_time_key true'].join("\n") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # FileOutput#write returns path - paths = d.run - check_gzipped_result(paths[0], %[a:1\ttime:2011-01-02T13:14:15Z\n] + %[a:2\ttime:2011-01-02T13:14:15Z\n]) - end + path = d.instance.last_written_path + check_gzipped_result(path, %[a:1\ttime:2011-01-02T13:14:15Z\n] + %[a:2\ttime:2011-01-02T13:14:15Z\n]) + end - def test_write_with_format_single_value - d = create_driver [CONFIG, 'format single_value', 'message_key a'].join("\n") + test 'single_value' do + d = create_driver [CONFIG, 'format single_value', 'message_key a'].join("\n") - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + end - # FileOutput#write returns path - paths = d.run - check_gzipped_result(paths[0], %[1\n] + %[2\n]) + path = d.instance.last_written_path + check_gzipped_result(path, %[1\n] + %[2\n]) + end end - def test_write_path_increment - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + test 'path with index number' do + time = event_time("2011-01-02 13:14:15 UTC") formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] write_once = ->(){ d = create_driver - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - d.run + d.run(default_tag: 'test'){ + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + } + d.instance.last_written_path } assert !File.exist?("#{TMP_DIR}/out_file_test.20110102_0.log.gz") - # FileOutput#write returns path - paths = write_once.call - assert_equal ["#{TMP_DIR}/out_file_test.20110102_0.log.gz"], paths - check_gzipped_result(paths[0], formatted_lines) + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102_0.log.gz", path + check_gzipped_result(path, formatted_lines) assert_equal 1, Dir.glob("#{TMP_DIR}/out_file_test.*").size - paths = write_once.call - assert_equal ["#{TMP_DIR}/out_file_test.20110102_1.log.gz"], paths - check_gzipped_result(paths[0], formatted_lines) + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102_1.log.gz", path + check_gzipped_result(path, formatted_lines) assert_equal 2, Dir.glob("#{TMP_DIR}/out_file_test.*").size - paths = write_once.call - assert_equal ["#{TMP_DIR}/out_file_test.20110102_2.log.gz"], paths - check_gzipped_result(paths[0], formatted_lines) + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102_2.log.gz", path + check_gzipped_result(path, formatted_lines) assert_equal 3, Dir.glob("#{TMP_DIR}/out_file_test.*").size end - def test_write_with_append - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + test 'append' do + time = event_time("2011-01-02 13:14:15 UTC") formatted_lines = %[2011-01-02T13:14:15Z\ttest\t{"a":1}\n] + %[2011-01-02T13:14:15Z\ttest\t{"a":2}\n] write_once = ->(){ @@ -268,53 +486,52 @@ def test_write_with_append utc append true ] - d.emit({"a"=>1}, time) - d.emit({"a"=>2}, time) - d.run + d.run(default_tag: 'test'){ + d.feed(time, {"a"=>1}) + d.feed(time, {"a"=>2}) + } + d.instance.last_written_path } - # FileOutput#write returns path - paths = write_once.call - assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths - check_gzipped_result(paths[0], formatted_lines) - paths = write_once.call - assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths - check_gzipped_result(paths[0], formatted_lines * 2) - paths = write_once.call - assert_equal ["#{TMP_DIR}/out_file_test.20110102.log.gz"], paths - check_gzipped_result(paths[0], formatted_lines * 3) + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path + check_gzipped_result(path, formatted_lines) + + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path + check_gzipped_result(path, formatted_lines * 2) + + path = write_once.call + assert_equal "#{TMP_DIR}/out_file_test.20110102.log.gz", path + check_gzipped_result(path, formatted_lines * 3) end - def test_write_with_symlink + test 'symlink' do omit "Windows doesn't support symlink" if Fluent.windows? conf = CONFIG + %[ symlink_path #{SYMLINK_PATH} ] symlink_path = "#{SYMLINK_PATH}" - d = Fluent::Test::TestDriver.new(Fluent::FileOutput).configure(conf) - + d = create_driver(conf) begin - d.instance.start - 10.times { sleep 0.05 } - - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - es = Fluent::OneEventStream.new(time, {"a"=>1}) - d.instance.emit_events('tag', es) + d.run(default_tag: 'tag') do + es = Fluent::OneEventStream.new(event_time("2011-01-02 13:14:15 UTC"), {"a"=>1}) + d.feed(es) - assert File.exist?(symlink_path) - assert File.symlink?(symlink_path) + assert File.symlink?(symlink_path) + assert File.exist?(symlink_path) # This checks dest of symlink exists or not. - es = Fluent::OneEventStream.new(event_time("2011-01-03 14:15:16 UTC"), {"a"=>2}) - d.instance.emit_events('tag', es) + es = Fluent::OneEventStream.new(event_time("2011-01-03 14:15:16 UTC"), {"a"=>2}) + d.feed(es) - assert File.exist?(symlink_path) - assert File.symlink?(symlink_path) + assert File.symlink?(symlink_path) + assert File.exist?(symlink_path) - meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {}) - assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path) + meta = d.instance.metadata('tag', event_time("2011-01-03 14:15:16 UTC"), {}) + assert_equal d.instance.buffer.instance_eval{ @stage[meta].path }, File.readlink(symlink_path) + end ensure - d.instance.shutdown FileUtils.rm_rf(symlink_path) end end @@ -326,11 +543,12 @@ def test_write_with_symlink time_slice_format %Y-%m-%d-%H utc true ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - # FileOutput#write returns path - paths = d.run - assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13_0.log"], paths + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + path = d.instance.last_written_path + assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13_0.log", path end test 'normal with append' do @@ -340,22 +558,26 @@ def test_write_with_symlink utc true append true ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - paths = d.run - assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13.log"], paths - end + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + path = d.instance.last_written_path + assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13.log", path + end - test '*' do + test '*' do d = create_driver(%[ path #{TMP_DIR}/out_file_test.*.txt time_slice_format %Y-%m-%d-%H utc true ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - paths = d.run - assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13_0.txt"], paths + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + path = d.instance.last_written_path + assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13_0.txt", path end test '* with append' do @@ -365,11 +587,228 @@ def test_write_with_symlink utc true append true ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i - d.emit({"a"=>1}, time) - paths = d.run - assert_equal ["#{TMP_DIR}/out_file_test.2011-01-02-13.txt"], paths + time = event_time("2011-01-02 13:14:15 UTC") + d.run(default_tag: 'test') do + d.feed(time, {"a"=>1}) + end + path = d.instance.last_written_path + assert_equal "#{TMP_DIR}/out_file_test.2011-01-02-13.txt", path end end -end + sub_test_case '#timekey_to_timeformat' do + setup do + @d = create_driver + @i = @d.instance + end + + test 'returns empty string for nil' do + assert_equal '', @i.timekey_to_timeformat(nil) + end + + test 'returns timestamp string with seconds for timekey smaller than 60' do + assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(1) + assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(30) + assert_equal '%Y%m%d%H%M%S', @i.timekey_to_timeformat(59) + end + + test 'returns timestamp string with minutes for timekey smaller than 3600' do + assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(60) + assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(180) + assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(1800) + assert_equal '%Y%m%d%H%M', @i.timekey_to_timeformat(3599) + end + + test 'returns timestamp string with hours for timekey smaller than 86400 (1 day)' do + assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(3600) + assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(7200) + assert_equal '%Y%m%d%H', @i.timekey_to_timeformat(86399) + end + + test 'returns timestamp string with days for timekey equal or greater than 86400' do + assert_equal '%Y%m%d', @i.timekey_to_timeformat(86400) + assert_equal '%Y%m%d', @i.timekey_to_timeformat(1000000) + assert_equal '%Y%m%d', @i.timekey_to_timeformat(1000000000) + end + end + + sub_test_case '#compression_suffix' do + setup do + @i = create_driver.instance + end + + test 'returns empty string for nil (no compression method specified)' do + assert_equal '', @i.compression_suffix(nil) + end + + test 'returns .gz for gzip' do + assert_equal '.gz', @i.compression_suffix(:gzip) + end + end + + sub_test_case '#generate_path_template' do + setup do + @i = create_driver.instance + end + + data( + 'day' => [86400, '%Y%m%d', '%Y-%m-%d'], + 'hour' => [3600, '%Y%m%d%H', '%Y-%m-%d_%H'], + 'minute' => [60, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'], + ) + test 'generates path with timestamp placeholder for original path with tailing star with timekey' do |data| + timekey, placeholder, time_slice_format = data + # with index placeholder, without compression suffix when append disabled and compression disabled + assert_equal "/path/to/file.#{placeholder}_**", @i.generate_path_template('/path/to/file.*', timekey, false, nil) + # with index placeholder, with .gz suffix when append disabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}_**.gz", @i.generate_path_template('/path/to/file.*', timekey, false, :gzip) + # without index placeholder, without compression suffix when append enabled and compression disabled + assert_equal "/path/to/file.#{placeholder}", @i.generate_path_template('/path/to/file.*', timekey, true, nil) + # without index placeholder, with .gz suffix when append disabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}.gz", @i.generate_path_template('/path/to/file.*', timekey, true, :gzip) + + # time_slice_format will used instead of computed placeholder if specified + assert_equal "/path/to/file.#{time_slice_format}_**", @i.generate_path_template('/path/to/file.*', timekey, false, nil, time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}_**.gz", @i.generate_path_template('/path/to/file.*', timekey, false, :gzip, time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}", @i.generate_path_template('/path/to/file.*', timekey, true, nil, time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}.gz", @i.generate_path_template('/path/to/file.*', timekey, true, :gzip, time_slice_format: time_slice_format) + end + + data( + 'day' => [86400 * 2, '%Y%m%d', '%Y-%m-%d'], + 'hour' => [7200, '%Y%m%d%H', '%Y-%m-%d_%H'], + 'minute' => [180, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'], + ) + test 'generates path with timestamp placeholder for original path with star and suffix with timekey' do |data| + timekey, placeholder, time_slice_format = data + # with index placeholder, without compression suffix when append disabled and compression disabled + assert_equal "/path/to/file.#{placeholder}_**.data", @i.generate_path_template('/path/to/file.*.data', timekey, false, nil) + # with index placeholder, with .gz suffix when append disabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}_**.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, false, :gzip) + # without index placeholder, without compression suffix when append enabled and compression disabled + assert_equal "/path/to/file.#{placeholder}.data", @i.generate_path_template('/path/to/file.*.data', timekey, true, nil) + # without index placeholder, with .gz suffix when append disabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, true, :gzip) + + # time_slice_format will used instead of computed placeholder if specified + assert_equal "/path/to/file.#{time_slice_format}_**.data", @i.generate_path_template('/path/to/file.*.data', timekey, false, nil, time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}_**.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, false, :gzip, time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}.data", @i.generate_path_template('/path/to/file.*.data', timekey, true, nil, time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}.data.gz", @i.generate_path_template('/path/to/file.*.data', timekey, true, :gzip, time_slice_format: time_slice_format) + end + + test 'raise error to show it is a bug when path including * specified without timekey' do + assert_raise "BUG: configuration error must be raised for path including '*' without timekey" do + @i.generate_path_template('/path/to/file.*.log', nil, false, nil) + end + end + + data( + 'day' => [86400 * 7, '%Y%m%d', '%Y-%m-%d'], + 'hour' => [3600 * 6, '%Y%m%d%H', '%Y-%m-%d_%H'], + 'minute' => [60 * 15, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'], + ) + test 'generates path with timestamp placeholder for original path without time placeholders & star with timekey, and path_suffix configured' do |data| + timekey, placeholder, time_slice_format = data + # with index placeholder, without compression suffix when append disabled and compression disabled + assert_equal "/path/to/file.#{placeholder}_**.log", @i.generate_path_template('/path/to/file', timekey, false, nil, path_suffix: '.log') + # with index placeholder, with .gz suffix when append disabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}_**.log.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip, path_suffix: '.log') + # without index placeholder, without compression suffix when append enabled and compression disabled + assert_equal "/path/to/file.#{placeholder}.log", @i.generate_path_template('/path/to/file', timekey, true, nil, path_suffix: '.log') + # without index placeholder, with compression suffix when append enabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}.log.gz", @i.generate_path_template('/path/to/file', timekey, true, :gzip, path_suffix: '.log') + + # time_slice_format will be appended always if it's specified + assert_equal "/path/to/file.#{time_slice_format}_**.log", @i.generate_path_template('/path/to/file', timekey, false, nil, path_suffix: '.log', time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}_**.log.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip, path_suffix: '.log', time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}.log", @i.generate_path_template('/path/to/file', timekey, true, nil, path_suffix: '.log', time_slice_format: time_slice_format) + assert_equal "/path/to/file.#{time_slice_format}.log.gz", @i.generate_path_template('/path/to/file', timekey, true, :gzip, path_suffix: '.log', time_slice_format: time_slice_format) + end + + data( + 'day' => [86400, '%Y%m%d', '%Y-%m-%d'], + 'hour' => [3600, '%Y%m%d%H', '%Y-%m-%d_%H'], + 'minute' => [60, '%Y%m%d%H%M', '%Y-%m-%d_%H%M'], + ) + test 'generates path with timestamp placeholder for original path without star with timekey, and path_suffix not configured' do |data| + timekey, placeholder, time_slice_format = data + # with index placeholder, without compression suffix when append disabled and compression disabled + assert_equal "/path/to/file.#{placeholder}_**", @i.generate_path_template('/path/to/file', timekey, false, nil) + # with index placeholder, with .gz suffix when append disabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}_**.gz", @i.generate_path_template('/path/to/file', timekey, false, :gzip) + # without index placeholder, without compression suffix when append enabled and compression disabled + assert_equal "/path/to/file.#{placeholder}", @i.generate_path_template('/path/to/file', timekey, true, nil) + # without index placeholder, with compression suffix when append enabled and gzip compression enabled + assert_equal "/path/to/file.#{placeholder}.gz", @i.generate_path_template('/path/to/file', timekey, true, :gzip) + end + + test 'generates path without adding timestamp placeholder part if original path has enough placeholders for specified timekey' do + assert_equal "/path/to/file.%Y%m%d", @i.generate_path_template('/path/to/file.%Y%m%d', 86400, true, nil) + assert_equal "/path/to/%Y%m%d/file", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil) + + assert_equal "/path/to/%Y%m%d/file_**", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil) + + assert_raise Fluent::ConfigError.new("insufficient timestamp placeholders in path") do + @i.generate_path_template('/path/to/%Y%m/file', 86400, true, nil) + end + assert_raise Fluent::ConfigError.new("insufficient timestamp placeholders in path") do + @i.generate_path_template('/path/to/file.%Y%m%d.log', 3600, true, nil) + end + + assert_equal "/path/to/file.%Y%m%d_%H_**.log.gz", @i.generate_path_template('/path/to/file.%Y%m%d_%H', 7200, false, :gzip, path_suffix: '.log') + assert_equal "/path/to/${tag}/file.%Y%m%d_%H_**.log.gz", @i.generate_path_template('/path/to/${tag}/file.%Y%m%d_%H', 7200, false, :gzip, path_suffix: '.log') + end + + test 'generates path with specified time_slice_format appended even if path has sufficient timestamp placeholders' do + assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H_**", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil, time_slice_format: '%Y-%m-%d_%H') + assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil, time_slice_format: '%Y-%m-%d_%H') + assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H_**.log", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, false, nil, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log') + assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H.log", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, nil, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log') + assert_equal "/path/to/%Y%m%d/file.%Y-%m-%d_%H.log.gz", @i.generate_path_template('/path/to/%Y%m%d/file', 86400, true, :gzip, time_slice_format: '%Y-%m-%d_%H', path_suffix: '.log') + end + + test 'generates path without timestamp placeholder when path does not include * and timekey not specified' do + assert_equal '/path/to/file.log', @i.generate_path_template('/path/to/file.log', nil, true, nil) + assert_equal '/path/to/file.log_**', @i.generate_path_template('/path/to/file.log', nil, false, nil) + assert_equal '/path/to/file.${tag}.log_**', @i.generate_path_template('/path/to/file.${tag}.log', nil, false, nil) + assert_equal '/path/to/file.${tag}_**.log', @i.generate_path_template('/path/to/file.${tag}', nil, false, nil, path_suffix: '.log') + end + end + + sub_test_case '#find_filepath_available' do + setup do + @tmp = File.join(TMP_DIR, 'find_filepath_test') + FileUtils.mkdir_p @tmp + @i = create_driver.instance + end + + teardown do + FileUtils.rm_rf @tmp + end + + test 'raise error if argument path does not include index placeholder' do + assert_raise "BUG: index placeholder not found in path: #{@tmp}/myfile" do + @i.find_filepath_available("#{@tmp}/myfile") + end + end + + data( + 'without suffix' => ['myfile_0', 'myfile_**'], + 'with timestamp' => ['myfile_20161003_0', 'myfile_20161003_**'], + 'with base suffix' => ['myfile_0.log', 'myfile_**.log'], + 'with compression suffix' => ['myfile_0.log.gz', 'myfile_**.log.gz'], + ) + test 'returns filepath with _0 at first' do |data| + expected, argument = data + assert_equal File.join(@tmp, expected), @i.find_filepath_available(File.join(@tmp, argument)) + end + + test 'returns filepath with index which does not exist yet' do + 5.times do |i| + File.open(File.join(@tmp, "exist_#{i}.log"), 'a') + end + assert_equal File.join(@tmp, "exist_5.log"), @i.find_filepath_available(File.join(@tmp, "exist_**.log")) + end + end +end