Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements AWS SigV4 for the HTTP output plugin. #4459

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"])
gem.add_runtime_dependency("webrick", ["~> 1.4"])
gem.add_runtime_dependency("aws-sigv4", ["~> 1.8"])
gem.add_runtime_dependency("aws-sdk-sts", ["~> 1.11"])
gem.add_runtime_dependency("rexml", ["~> 3.2"])

# gems that aren't default gems as of Ruby 3.4
gem.add_runtime_dependency("base64", ["~> 0.2"])
Expand Down
60 changes: 55 additions & 5 deletions lib/fluent/plugin/out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
require 'fluent/tls'
require 'fluent/plugin/output'
require 'fluent/plugin_helper/socket'
require 'aws-sigv4'
require 'aws-sdk-core'

# patch Net::HTTP to support extra_chain_cert which was added in Ruby feature #9758.
# see: https://github.com/ruby/ruby/commit/31af0dafba6d3769d2a39617c0dddedb97883712
Expand Down Expand Up @@ -87,11 +89,17 @@ class RetryableResponse < StandardError; end

config_section :auth, required: false, multi: false do
desc 'The method for HTTP authentication'
config_param :method, :enum, list: [:basic], default: :basic
config_param :method, :enum, list: [:basic, :aws_sigv4], default: :basic
desc 'The username for basic authentication'
config_param :username, :string, default: nil
desc 'The password for basic authentication'
config_param :password, :string, default: nil, secret: true
desc 'The AWS service to authenticate against'
config_param :aws_service, :string, default: nil
desc 'The AWS region to use when authenticating'
config_param :aws_region, :string, default: nil
desc 'The AWS role ARN to assume when authenticating'
config_param :aws_role_arn, :string, default: nil
end

def initialize
Expand Down Expand Up @@ -121,6 +129,30 @@ def configure(conf)
end
define_singleton_method(:format, method(:format_json_array))
end

if @auth and @auth.method == :aws_sigv4

raise Fluent::ConfigError, "aws_service is required for aws_sigv4 auth" unless @auth.aws_service != nil
raise Fluent::ConfigError, "aws_region is required for aws_sigv4 auth" unless @auth.aws_region != nil

if @auth.aws_role_arn == nil
aws_credentials = Aws::CredentialProviderChain.new.resolve
else
aws_credentials = Aws::AssumeRoleCredentials.new(
client: Aws::STS::Client.new(
region: @auth.aws_region
),
role_arn: @auth.aws_role_arn,
role_session_name: "fluentd"
)
end

@aws_signer = Aws::Sigv4::Signer.new(
service: @auth.aws_service,
region: @auth.aws_region,
credentials_provider: aws_credentials
)
end
end

def multi_workers_ready?
Expand Down Expand Up @@ -215,7 +247,7 @@ def parse_endpoint(chunk)
URI.parse(endpoint)
end

def set_headers(req, chunk)
def set_headers(req, uri, chunk)
if @headers
@headers.each do |k, v|
req[k] = v
Expand All @@ -227,6 +259,7 @@ def set_headers(req, chunk)
end
end
req['Content-Type'] = @content_type
req['Host'] = uri.host
end

def create_request(chunk, uri)
Expand All @@ -236,11 +269,28 @@ def create_request(chunk, uri)
when :put
Net::HTTP::Put.new(uri.request_uri)
end
set_headers(req, uri, chunk)
req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read

if @auth
req.basic_auth(@auth.username, @auth.password)
if @auth.method == :basic
req.basic_auth(@auth.username, @auth.password)
elsif @auth.method == :aws_sigv4
signature = @aws_signer.sign_request(
http_method: req.method,
url: uri.request_uri,
headers: {
'Content-Type' => @content_type,
'Host' => uri.host
},
body: req.body
)
req.add_field('x-amz-date', signature.headers['x-amz-date'])
req.add_field('x-amz-security-token', signature.headers['x-amz-security-token'])
req.add_field('x-amz-content-sha256', signature.headers['x-amz-content-sha256'])
req.add_field('authorization', signature.headers['authorization'])
end
end
set_headers(req, chunk)
req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read
req
end

Expand Down
94 changes: 94 additions & 0 deletions test/plugin/test_out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'net/http'
require 'uri'
require 'json'
require 'aws-sdk-core'

# WEBrick's ProcHandler doesn't handle PUT by default
module WEBrick::HTTPServlet
Expand Down Expand Up @@ -390,6 +391,99 @@ def test_basic_auth_with_invalid_auth
end
end


sub_test_case 'aws sigv4 auth' do
setup do
@@fake_aws_credentials = Aws::Credentials.new(
'fakeaccess',
'fakesecret',
'fake session token'
)
end

def server_port
19883
end

def test_aws_sigv4_sts_role_arn
stub(Aws::AssumeRoleCredentials).new do |credentials_provider|
stub(credentials_provider).credentials {
@@fake_aws_credentials
}
credentials_provider
end

d = create_driver(config + %[
<auth>
method aws_sigv4
aws_service someservice
aws_region my-region-1
aws_role_arn arn:aws:iam::123456789012:role/MyRole
</auth>
])
d.run(default_tag: 'test.http') do
test_events.each { |event|
d.feed(event)
}
end

result = @@result
assert_equal 'POST', result.method
assert_equal 'application/x-ndjson', result.content_type
assert_equal test_events, result.data
assert_not_empty result.headers
assert_equal '127.0.0.1', result.headers['host']
assert_not_nil result.headers['authorization']
assert_match /AWS4-HMAC-SHA256 Credential=[a-zA-Z0-9]*\/\d+\/my-region-1\/someservice\/aws4_request/, result.headers['authorization']
assert_match /SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token/, result.headers['authorization']
assert_equal @@fake_aws_credentials.session_token, result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-content-sha256']
assert_not_empty result.headers['x-amz-content-sha256']
assert_not_nil result.headers['x-amz-security-token']
assert_not_empty result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-date']
assert_not_empty result.headers['x-amz-date']
end

def test_aws_sigv4_no_role
stub(Aws::CredentialProviderChain).new do |provider_chain|
stub(provider_chain).resolve {
@@fake_aws_credentials
}
provider_chain
end
d = create_driver(config + %[
<auth>
method aws_sigv4
aws_service someservice
aws_region my-region-1
</auth>
])
d.run(default_tag: 'test.http') do
test_events.each { |event|
d.feed(event)
}
end

result = @@result
assert_equal 'POST', result.method
assert_equal 'application/x-ndjson', result.content_type
assert_equal test_events, result.data
assert_not_empty result.headers
assert_equal '127.0.0.1', result.headers['host']
assert_not_nil result.headers['authorization']
assert_match /AWS4-HMAC-SHA256 Credential=[a-zA-Z0-9]*\/\d+\/my-region-1\/someservice\/aws4_request/, result.headers['authorization']
assert_match /SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token/, result.headers['authorization']
assert_equal @@fake_aws_credentials.session_token, result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-content-sha256']
assert_not_empty result.headers['x-amz-content-sha256']
assert_not_nil result.headers['x-amz-security-token']
assert_not_empty result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-date']
assert_not_empty result.headers['x-amz-date']
end
end

sub_test_case 'HTTPS' do
def server_port
19882
Expand Down
Loading