-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Masahiro Nakagawa <[email protected]>
- Loading branch information
1 parent
2ec5466
commit fdc56bc
Showing
1 changed file
with
205 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,205 @@ | ||
require 'net/http' | ||
require 'uri' | ||
require 'openssl' | ||
require 'fluent/plugin/output' | ||
require 'fluent/plugin_helper/socket' | ||
|
||
module Fluent::Plugin | ||
class HTTPOutput < Output | ||
Fluent::Plugin.register_output('http', self) | ||
|
||
helpers :formatter | ||
|
||
desc 'The endpoint for HTTP request, e.g. http://example.com/api' | ||
config_param :endpoint, :string | ||
desc 'The method for HTTP request' | ||
config_param :http_method, :enum, list: [:put, :post], default: :post | ||
desc 'The proxy for HTTP request' | ||
config_param :proxy, :string, default: ENV['HTTP_PROXY'] || ENV['http_proxy'] | ||
desc 'Content-Type for HTTP request' | ||
config_param :content_type, :string, default: nil | ||
desc 'Additional headers for HTTP request' | ||
config_param :headers, :hash, default: nil | ||
|
||
desc 'The connection open timeout in seconds' | ||
config_param :open_timeout, :integer, default: nil | ||
desc 'The read timeout in seconds' | ||
config_param :read_timeout, :integer, default: nil | ||
desc 'The TLS timeout in seconds' | ||
config_param :ssl_timeout, :integer, default: nil | ||
|
||
desc 'The CA certificate path for TLS' | ||
config_param :tls_ca_cert_path, :string, default: nil | ||
desc 'The client certificate path for TLS' | ||
config_param :tls_client_cert_path, :string, default: nil | ||
desc 'The client private key path for TLS' | ||
config_param :tls_private_key_path, :string, default: nil | ||
desc 'The client private key passphrase for TLS' | ||
config_param :tls_private_key_passphrase, :string, default: nil, secret: true | ||
desc 'The verify mode of TLS' | ||
config_param :tls_verify_mode, :enum, list: [:none, :peer], default: :peer | ||
desc 'The default version of TLS' | ||
config_param :tls_version, :enum, list: Fluent::PluginHelper::Socket::TLS_SUPPORTED_VERSIONS, default: Fluent::PluginHelper::Socket::TLS_DEFAULT_VERSION | ||
desc 'The cipher configuration of TLS' | ||
config_param :tls_ciphers, :string, default: Fluent::PluginHelper::Socket::CIPHERS_DEFAULT | ||
|
||
desc 'Raise UnrecoverableError when the response is non success, 4xx/5xx' | ||
config_param :error_response_as_unrecoverable, :bool, default: true | ||
|
||
config_section :format do | ||
config_set_default :@type, 'json' | ||
end | ||
|
||
config_section :auth, required: false, multi: false do | ||
desc 'The method for HTTP authentication' | ||
config_param :method, :enum, list: [:basic], default: :basic | ||
desc 'The username for basic authentication' | ||
config_param :username, :string, default: nil | ||
desc 'The password for basic authentication' | ||
config_param :password, :string, default: nil, secret: true | ||
end | ||
|
||
def initialize | ||
super | ||
|
||
@uri = nil | ||
@proxy_uri = nil | ||
@formatter = nil | ||
end | ||
|
||
def configure(conf) | ||
super | ||
|
||
@http_opt = setup_http_option | ||
@proxy_uri = URI.parse(@proxy) if @proxy | ||
@formatter = formatter_create | ||
@content_type = setup_content_type unless @content_type | ||
end | ||
|
||
def multi_workers_ready? | ||
true | ||
end | ||
|
||
def formatted_to_msgpack_binary? | ||
@formatter_configs.first[:@type] == 'msgpack' | ||
end | ||
|
||
def format(tag, time, record) | ||
@formatter.format(tag, time, record) | ||
end | ||
|
||
def write(chunk) | ||
uri = parse_endpoint(chunk) | ||
req = create_request(chunk, uri) | ||
|
||
log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } | ||
|
||
send_request(uri, req) | ||
end | ||
|
||
def setup_content_type | ||
case @formatter_configs.first[:@type] | ||
when 'json' | ||
'application/x-ndjson' | ||
when 'csv' | ||
'text/csv' | ||
when 'tsv', 'ltsv' | ||
'text/tab-separated-values' | ||
when 'msgpack' | ||
'application/x-msgpack' | ||
when 'out_file', 'single_value', 'stdout', 'hash' | ||
'text/plain' | ||
else | ||
raise Fluent::ConfigError, "can't determind Content-Type from formatter type. Set content_type parameter explicitly" | ||
end | ||
end | ||
|
||
def setup_http_option | ||
use_ssl = URI.parse(@endpoint).scheme == 'https' | ||
opt = { | ||
:open_timeout => @open_timeout, | ||
:read_timeout => @read_timeout, | ||
:ssl_timeout => @ssl_timeout, | ||
:use_ssl => use_ssl | ||
} | ||
|
||
if use_ssl | ||
if @tls_ca_cert_path | ||
raise Fluent::ConfigError, "tls_ca_cert_path is wrong: #{@tls_ca_cert_path}" unless File.file?(@tls_ca_cert_path) | ||
opt[:ca_file] = @tls_ca_cert_path | ||
end | ||
if @tls_client_cert_path | ||
raise Fluent::ConfigError, "tls_client_cert_path is wrong: #{@tls_client_cert_path}" unless File.file?(@tls_client_cert_path) | ||
opt[:cert] = OpenSSL::X509::Certificate.new(File.read(@tls_client_cert_path)) | ||
end | ||
if @tls_private_key_path | ||
raise Fluent::ConfigError, "tls_private_key_path is wrong: #{@tls_private_key_path}" unless File.file?(@tls_private_key_path) | ||
opt[:key] = OpenSSL::PKey::RSA.new(File.read(@tls_private_key_path), @tls_private_key_passphrase) | ||
end | ||
opt[:verify_mode] = case @tls_verify_mode | ||
when :none | ||
OpenSSL::SSL::VERIFY_NONE | ||
when :peer | ||
OpenSSL::SSL::VERIFY_PEER | ||
end | ||
opt[:ciphers] = @tls_ciphers | ||
opt[:ssl_version] = @tls_version | ||
end | ||
|
||
opt | ||
end | ||
|
||
def parse_endpoint(chunk) | ||
endpoint = extract_placeholders(@endpoint, chunk) | ||
URI.parse(endpoint) | ||
end | ||
|
||
def set_headers(req) | ||
if @headers | ||
@headers.each do |k, v| | ||
req[k] = v | ||
end | ||
end | ||
req['Content-Type'] = @content_type | ||
end | ||
|
||
def create_request(chunk, uri) | ||
req = case @http_method | ||
when :post | ||
Net::HTTP::Post.new(uri.request_uri) | ||
when :put | ||
Net::HTTP::Put.new(uri.request_uri) | ||
end | ||
if @auth | ||
req.basic_auth(@auth.username, @auth.password) | ||
end | ||
set_headers(req) | ||
req.body = chunk.read | ||
req | ||
end | ||
|
||
def send_request(uri, req) | ||
res = if @proxy_uri | ||
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http| | ||
http.request(req) | ||
} | ||
else | ||
Net::HTTP.start(uri.host, uri.port, @http_opt) { |http| | ||
http.request(req) | ||
} | ||
end | ||
|
||
if res.is_a?(Net::HTTPSuccess) | ||
log.debug { "#{res.code} #{res.message} #{res.body}" } | ||
else | ||
msg = "#{res.code} #{res.message} #{res.body}" | ||
|
||
if @error_response_as_unrecoverable | ||
raise Fluent::UnrecoverableError, msg | ||
else | ||
log.error "got error response from '#{@http_method.capitalize} #{uri.to_s}' : #{msg}" | ||
end | ||
end | ||
end | ||
end | ||
end |