From fdc56bcf925df537850c9bd8330a0ce1ace84f1d Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Wed, 10 Jul 2019 02:57:24 +0900 Subject: [PATCH] Add out_http plugin Signed-off-by: Masahiro Nakagawa --- lib/fluent/plugin/out_http.rb | 205 ++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 lib/fluent/plugin/out_http.rb diff --git a/lib/fluent/plugin/out_http.rb b/lib/fluent/plugin/out_http.rb new file mode 100644 index 0000000000..dc56854fb9 --- /dev/null +++ b/lib/fluent/plugin/out_http.rb @@ -0,0 +1,205 @@ +require 'net/http' +require 'uri' +require 'openssl' +require 'fluent/plugin/output' +require 'fluent/plugin_helper/socket' + +module Fluent::Plugin + class HTTPOutput < Output + Fluent::Plugin.register_output('http', self) + + helpers :formatter + + desc 'The endpoint for HTTP request, e.g. http://example.com/api' + config_param :endpoint, :string + desc 'The method for HTTP request' + config_param :http_method, :enum, list: [:put, :post], default: :post + desc 'The proxy for HTTP request' + config_param :proxy, :string, default: ENV['HTTP_PROXY'] || ENV['http_proxy'] + desc 'Content-Type for HTTP request' + config_param :content_type, :string, default: nil + desc 'Additional headers for HTTP request' + config_param :headers, :hash, default: nil + + desc 'The connection open timeout in seconds' + config_param :open_timeout, :integer, default: nil + desc 'The read timeout in seconds' + config_param :read_timeout, :integer, default: nil + desc 'The TLS timeout in seconds' + config_param :ssl_timeout, :integer, default: nil + + desc 'The CA certificate path for TLS' + config_param :tls_ca_cert_path, :string, default: nil + desc 'The client certificate path for TLS' + config_param :tls_client_cert_path, :string, default: nil + desc 'The client private key path for TLS' + config_param :tls_private_key_path, :string, default: nil + desc 'The client private key passphrase for TLS' + config_param :tls_private_key_passphrase, :string, default: nil, secret: true + desc 'The verify mode of TLS' + config_param :tls_verify_mode, :enum, list: [:none, :peer], default: :peer + desc 'The default version of TLS' + config_param :tls_version, :enum, list: Fluent::PluginHelper::Socket::TLS_SUPPORTED_VERSIONS, default: Fluent::PluginHelper::Socket::TLS_DEFAULT_VERSION + desc 'The cipher configuration of TLS' + config_param :tls_ciphers, :string, default: Fluent::PluginHelper::Socket::CIPHERS_DEFAULT + + desc 'Raise UnrecoverableError when the response is non success, 4xx/5xx' + config_param :error_response_as_unrecoverable, :bool, default: true + + config_section :format do + config_set_default :@type, 'json' + end + + config_section :auth, required: false, multi: false do + desc 'The method for HTTP authentication' + config_param :method, :enum, list: [:basic], default: :basic + desc 'The username for basic authentication' + config_param :username, :string, default: nil + desc 'The password for basic authentication' + config_param :password, :string, default: nil, secret: true + end + + def initialize + super + + @uri = nil + @proxy_uri = nil + @formatter = nil + end + + def configure(conf) + super + + @http_opt = setup_http_option + @proxy_uri = URI.parse(@proxy) if @proxy + @formatter = formatter_create + @content_type = setup_content_type unless @content_type + end + + def multi_workers_ready? + true + end + + def formatted_to_msgpack_binary? + @formatter_configs.first[:@type] == 'msgpack' + end + + def format(tag, time, record) + @formatter.format(tag, time, record) + end + + def write(chunk) + uri = parse_endpoint(chunk) + req = create_request(chunk, uri) + + log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } + + send_request(uri, req) + end + + def setup_content_type + case @formatter_configs.first[:@type] + when 'json' + 'application/x-ndjson' + when 'csv' + 'text/csv' + when 'tsv', 'ltsv' + 'text/tab-separated-values' + when 'msgpack' + 'application/x-msgpack' + when 'out_file', 'single_value', 'stdout', 'hash' + 'text/plain' + else + raise Fluent::ConfigError, "can't determind Content-Type from formatter type. Set content_type parameter explicitly" + end + end + + def setup_http_option + use_ssl = URI.parse(@endpoint).scheme == 'https' + opt = { + :open_timeout => @open_timeout, + :read_timeout => @read_timeout, + :ssl_timeout => @ssl_timeout, + :use_ssl => use_ssl + } + + if use_ssl + if @tls_ca_cert_path + raise Fluent::ConfigError, "tls_ca_cert_path is wrong: #{@tls_ca_cert_path}" unless File.file?(@tls_ca_cert_path) + opt[:ca_file] = @tls_ca_cert_path + end + if @tls_client_cert_path + raise Fluent::ConfigError, "tls_client_cert_path is wrong: #{@tls_client_cert_path}" unless File.file?(@tls_client_cert_path) + opt[:cert] = OpenSSL::X509::Certificate.new(File.read(@tls_client_cert_path)) + end + if @tls_private_key_path + raise Fluent::ConfigError, "tls_private_key_path is wrong: #{@tls_private_key_path}" unless File.file?(@tls_private_key_path) + opt[:key] = OpenSSL::PKey::RSA.new(File.read(@tls_private_key_path), @tls_private_key_passphrase) + end + opt[:verify_mode] = case @tls_verify_mode + when :none + OpenSSL::SSL::VERIFY_NONE + when :peer + OpenSSL::SSL::VERIFY_PEER + end + opt[:ciphers] = @tls_ciphers + opt[:ssl_version] = @tls_version + end + + opt + end + + def parse_endpoint(chunk) + endpoint = extract_placeholders(@endpoint, chunk) + URI.parse(endpoint) + end + + def set_headers(req) + if @headers + @headers.each do |k, v| + req[k] = v + end + end + req['Content-Type'] = @content_type + end + + def create_request(chunk, uri) + req = case @http_method + when :post + Net::HTTP::Post.new(uri.request_uri) + when :put + Net::HTTP::Put.new(uri.request_uri) + end + if @auth + req.basic_auth(@auth.username, @auth.password) + end + set_headers(req) + req.body = chunk.read + req + end + + def send_request(uri, req) + res = if @proxy_uri + Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| + http.request(req) + } + else + Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| + http.request(req) + } + end + + if res.is_a?(Net::HTTPSuccess) + log.debug { "#{res.code} #{res.message} #{res.body}" } + else + msg = "#{res.code} #{res.message} #{res.body}" + + if @error_response_as_unrecoverable + raise Fluent::UnrecoverableError, msg + else + log.error "got error response from '#{@http_method.capitalize} #{uri.to_s}' : #{msg}" + end + end + end + end +end