Skip to content

Commit

Permalink
Merge pull request #111 from aianus/aianus/kinesis_device
Browse files Browse the repository at this point in the history
Add support for logging to AWS Kinesis (basically a hosted Kafka)
  • Loading branch information
dwbutler authored Jan 6, 2017
2 parents 5874e07 + 5676059 commit ca5e395
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 1 deletion.
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ writing to a file or syslog since logstash can receive the structured data direc
## Features

* Can write directly to a logstash listener over a UDP or TCP/SSL connection.
* Can write to a file, Redis, Kafka, a unix socket, syslog, stdout, or stderr.
* Can write to a file, Redis, Kafka, Kinesis, a unix socket, syslog, stdout, or stderr.
* Logger can take a string message, a hash, a `LogStash::Event`, an object, or a JSON string as input.
* Events are automatically populated with message, timestamp, host, and severity.
* Writes in logstash JSON format, but supports other formats as well.
Expand Down Expand Up @@ -491,6 +491,30 @@ config.logstash.backoff = 1

```

#### Kinesis

Add the aws-sdk gem to your Gemfile:

gem 'aws-sdk'

```ruby
# Required
config.logstash.type = :kinesis

# Optional, will default to the 'logstash' stream
config.logstash.stream = 'my-stream-name'

# Optional, will default to 'us-east-1'
config.logstash.aws_region = 'us-west-2'

# Optional, will default to the AWS_ACCESS_KEY_ID environment variable
config.logstash.aws_access_key_id = 'ASKASKHLD12341'

# Optional, will default to the AWS_SECRET_ACCESS_KEY environment variable
config.logstash.aws_secret_access_key = 'ASKASKHLD1234123412341234'

```

#### File

```ruby
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash-logger/device.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Device
autoload :Unix, 'logstash-logger/device/unix'
autoload :Redis, 'logstash-logger/device/redis'
autoload :Kafka, 'logstash-logger/device/kafka'
autoload :Kinesis, 'logstash-logger/device/kinesis'
autoload :File, 'logstash-logger/device/file'
autoload :IO, 'logstash-logger/device/io'
autoload :Stdout, 'logstash-logger/device/stdout'
Expand Down Expand Up @@ -51,6 +52,7 @@ def self.device_klass_for(type)
when :file then File
when :redis then Redis
when :kafka then Kafka
when :kinesis then Kinesis
when :io then IO
when :stdout then Stdout
when :stderr then Stderr
Expand Down
79 changes: 79 additions & 0 deletions lib/logstash-logger/device/kinesis.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
require 'aws-sdk'

module LogStashLogger
module Device
class Kinesis < Connectable

DEFAULT_REGION = 'us-east-1'
DEFAULT_STREAM = 'logstash'
RECOVERABLE_ERROR_CODES = [
"ServiceUnavailable",
"Throttling",
"RequestExpired",
"ProvisionedThroughputExceededException"
]

attr_accessor :aws_region, :stream

def initialize(opts)
super
@access_key_id = opts[:aws_access_key_id] || ENV['AWS_ACCESS_KEY_ID']
@secret_access_key = opts[:aws_secret_access_key] || ENV['AWS_SECRET_ACCESS_KEY']
@aws_region = opts[:aws_region] || DEFAULT_REGION
@stream = opts[:stream] || DEFAULT_STREAM
end

def connect
@io = ::Aws::Kinesis::Client.new(
region: @aws_region,
credentials: ::Aws::Credentials.new(@access_key_id, @secret_access_key)
)
end

def with_connection
connect unless connected?
yield
rescue => e
log_error(e)
log_warning("giving up")
end

def write_batch(messages, group = nil)
kinesis_records = messages.map do |message|
{
data: message,
partition_key: SecureRandom.uuid
}
end

with_connection do
resp = @io.put_records({
records: kinesis_records,
stream_name: @stream
})

# Put any failed records back into the buffer
if resp.failed_record_count != 0
resp.records.each_with_index do |record, index|
if RECOVERABLE_ERROR_CODES.include?(record.error_code)
log_warning("Failed to post record to kinesis with error: #{record.error_code} #{record.error_message}")
log_warning("Retrying")
write(kinesis_records[index][:data])
elsif !record.error_code.nil? && record.error_code != ''
log_error("Failed to post record to kinesis with error: #{record.error_code} #{record.error_message}")
end
end
end
end
end

def write_one(message)
write_batch([message])
end

def close!
@io = nil
end
end
end
end
1 change: 1 addition & 0 deletions logstash-logger.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Gem::Specification.new do |gem|
end
gem.add_development_dependency 'redis'
gem.add_development_dependency 'poseidon'
gem.add_development_dependency 'aws-sdk'

if RUBY_VERSION < '2' || defined?(JRUBY_VERSION)
gem.add_development_dependency 'SyslogLogger'
Expand Down
45 changes: 45 additions & 0 deletions spec/device/kinesis_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
require 'logstash-logger'

describe LogStashLogger::Device::Kinesis do
include_context 'device'

let(:client) { double("Aws::Kinesis::Client") }

before(:each) do
allow(Aws::Kinesis::Client).to receive(:new) { client }
end

it "writes to a Kinesis stream" do
response = ::Aws::Kinesis::Types::PutRecordsOutput.new
response.failed_record_count = 0
response.records = []
expect(client).to receive(:put_records) { response }
kinesis_device.write "foo"

expect(kinesis_device).to be_connected
kinesis_device.close!
expect(kinesis_device).not_to be_connected
end

it "it puts records with recoverable errors back in the buffer" do
failed_record = ::Aws::Kinesis::Types::PutRecordsResultEntry.new
failed_record.error_code = "ProvisionedThroughputExceededException"
failed_record.error_message = "ProvisionedThroughputExceededException"
response = ::Aws::Kinesis::Types::PutRecordsOutput.new
response.failed_record_count = 1
response.records = [failed_record]

expect(client).to receive(:put_records) { response }
expect(kinesis_device).to receive(:write).with("foo")

kinesis_device.write_one "foo"
end

it "defaults the AWS region to us-east-1" do
expect(kinesis_device.aws_region).to eq('us-east-1')
end

it "defaults the kinesis stream to logstash" do
expect(kinesis_device.stream).to eq('logstash')
end
end
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def connection_type

let(:redis_device) { LogStashLogger::Device.new(type: :redis, sync: true) }
let(:kafka_device) { LogStashLogger::Device.new(type: :kafka, sync: true) }
let(:kinesis_device) { LogStashLogger::Device.new(type: :kinesis, sync: true) }

let(:outputs) { [{type: :stdout}, {type: :io, io: io}] }
let(:multi_delegator_device) { LogStashLogger::Device.new(type: :multi_delegator, outputs: outputs) }
Expand Down

0 comments on commit ca5e395

Please sign in to comment.