diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb index e6fcd56b1c..2cd410d827 100644 --- a/lib/fluent/plugin/in_syslog.rb +++ b/lib/fluent/plugin/in_syslog.rb @@ -74,7 +74,7 @@ class SyslogInput < Input desc 'The prefix of the tag. The tag itself is generated by the tag prefix, facility level, and priority.' config_param :tag, :string desc 'The transport protocol used to receive logs.(udp, tcp)' - config_param :protocol_type, :enum, list: [:tcp, :udp], default: :udp + config_param :protocol_type, :enum, list: [:tcp, :udp], default: nil, deprecated: "use transport directive" desc 'The message frame type.(traditional, octet_count)' config_param :frame_type, :enum, list: [:traditional, :octet_count], default: :traditional @@ -107,6 +107,11 @@ class SyslogInput < Input config_param :with_priority, :bool, default: true end + # overwrite server plugin to change default to :udp + config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do + config_argument :protocol, :enum, list: [:tcp, :udp, :tls], default: :udp + end + def configure(conf) compat_parameters_convert(conf, :parser) @@ -141,12 +146,13 @@ def multi_workers_ready? def start super - log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}" - case @protocol_type + log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type || @transport_config.protocol}" + case @protocol_type || @transport_config.protocol when :udp then start_udp_server when :tcp then start_tcp_server + when :tls then start_tcp_server(tls: true) else - raise "BUG: invalid protocol_type value:#{@protocol_type}" + raise "BUG: invalid transport value: #{@protocol_type || @transport_config.protocol}" end end @@ -156,12 +162,12 @@ def start_udp_server end end - def start_tcp_server + def start_tcp_server(tls: false) octet_count_frame = @frame_type == :octet_count delimiter = octet_count_frame ? " " : @delimiter delimiter_size = delimiter.size - server_create_connection(:in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname) do |conn| + server_create_connection(tls ? :in_syslog_tls_server : :in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname) do |conn| conn.data do |data| buffer = conn.buffer buffer << data diff --git a/test/plugin/test_in_syslog.rb b/test/plugin/test_in_syslog.rb index b66ff66f24..f75fb87b64 100755 --- a/test/plugin/test_in_syslog.rb +++ b/test/plugin/test_in_syslog.rb @@ -55,6 +55,35 @@ def test_configure_resolve_hostname(param) end end + data('Use protocol_type' => ['protocol_type tcp', :tcp, :udp], + 'Use transport' => ["\n ", nil, :tcp], + 'Use transport and protocol' => ["protocol_type udp\n\n ", :udp, :tcp]) + def test_configure_protocol(param) + conf, proto_type, transport_proto_type = *param + d = create_driver([CONFIG, conf].join("\n")) + + assert_equal(d.instance.protocol_type, proto_type) + assert_equal(d.instance.transport_config.protocol, transport_proto_type) + end + + # For backward compat + def test_respect_protocol_type_than_transport + d = create_driver([CONFIG, " \n", "protocol_type udp"].join("\n")) + tests = create_test_case + + d.run(expect_emits: 2) do + u = UDPSocket.new + u.connect('127.0.0.1', PORT) + tests.each {|test| + u.send(test['msg'], 0) + } + end + + assert(d.events.size > 0) + compare_test_result(d.events, tests) + end + + data( ipv4: ['127.0.0.1', CONFIG, ::Socket::AF_INET], ipv6: ['::1', IPv6_CONFIG, ::Socket::AF_INET6], @@ -119,7 +148,7 @@ def test_msg_size_udp_for_large_msg end def test_msg_size_with_tcp - d = create_driver([CONFIG, 'protocol_type tcp'].join("\n")) + d = create_driver([CONFIG, " \n"].join("\n")) tests = create_test_case d.run(expect_emits: 2) do @@ -135,7 +164,7 @@ def test_msg_size_with_tcp end def test_msg_size_with_same_tcp_connection - d = create_driver([CONFIG, 'protocol_type tcp'].join("\n")) + d = create_driver([CONFIG, " \n"].join("\n")) tests = create_test_case d.run(expect_emits: 2) do @@ -289,7 +318,7 @@ def compare_test_result(events, tests, options = {}) sub_test_case 'octet counting frame' do def test_msg_size_with_tcp - d = create_driver([CONFIG, 'protocol_type tcp', 'frame_type octet_count'].join("\n")) + d = create_driver([CONFIG, " \n", 'frame_type octet_count'].join("\n")) tests = create_test_case d.run(expect_emits: 2) do @@ -305,7 +334,7 @@ def test_msg_size_with_tcp end def test_msg_size_with_same_tcp_connection - d = create_driver([CONFIG, 'protocol_type tcp', 'frame_type octet_count'].join("\n")) + d = create_driver([CONFIG, " \n", 'frame_type octet_count'].join("\n")) tests = create_test_case d.run(expect_emits: 2) do