Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Support overflow_action in v14 api" #959

Merged
merged 1 commit into from
May 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def support_in_v12_style?(feature)
desc 'The length limit of the chunk queue.'
config_param :buffer_queue_limit, :integer, default: 256
desc 'The action when the size of buffer queue exceeds the buffer_queue_limit.'
config_param :buffer_queue_full_action, :enum, list: [:exception, :block, :drop_oldest_chunk], default: :exception
config_param :buffer_queue_full_action, :enum, list: [:exception, :block], default: :exception

config_param :flush_at_shutdown, :bool, default: true

Expand All @@ -204,7 +204,7 @@ def support_in_v12_style?(feature)
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"buffer_queue_full_action" => nil, # TODO: implement this on fluent/plugin/buffer
"flush_at_shutdown" => "flush_at_shutdown",
}

Expand Down
252 changes: 102 additions & 150 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
DEFAULT_CHUNK_BYTES_LIMIT = 8 * 1024 * 1024 # 8MB
DEFAULT_TOTAL_BYTES_LIMIT = 512 * 1024 * 1024 # 512MB, same with v0.12 (BufferedOutput + buf_memory: 64 x 8MB)

DEFAULT_CHUNK_FULL_THRESHOLD = 0.95

configured_in :buffer

# TODO: system total buffer bytes limit by SystemConfig
Expand All @@ -52,9 +50,6 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# optional new limitations
config_param :chunk_records_limit, :integer, default: nil

# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD

Metadata = Struct.new(:timekey, :tag, :variables)

# for tests
Expand Down Expand Up @@ -173,66 +168,77 @@ def metadata(timekey: nil, tag: nil, variables: nil)

# metadata MUST have consistent object_id for each variation
# data MUST be Array of serialized events
# metadata_and_data MUST be a hash of { metadata => data }
def write(metadata_and_data, bulk: false, enqueue: false)
return if metadata_and_data.size < 1
def emit(metadata, data, force: false)
return if data.size < 1
raise BufferOverflowError unless storable?

staged_bytesize = 0
operated_chunks = []

begin
metadata_and_data.each do |metadata, data|
write_once(metadata, data, bulk: bulk) do |chunk, adding_bytesize|
chunk.mon_enter # add lock to prevent to be committed/rollbacked from other threads
operated_chunks << chunk
staged_bytesize += adding_bytesize
end
end

return if operated_chunks.empty?
stored = false

first_chunk = operated_chunks.shift
# Following commits for other chunks also can finish successfully if the first commit operation
# finishes without any exceptions.
# In most cases, #commit just requires very small disk spaces, so major failure reason are
# permission errors, disk failures and other permanent(fatal) errors.
# the case whole data can be stored in staged chunk: almost all emits will success
chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }
original_bytesize = chunk.bytesize
chunk.synchronize do
begin
first_chunk.commit
enqueue_chunk(first_chunk.metadata) if enqueue || chunk_size_full?(first_chunk)
first_chunk.mon_exit
chunk.append(data)
if !chunk_size_over?(chunk) || force
chunk.commit
stored = true
@stage_size += (chunk.bytesize - original_bytesize)
else
chunk.rollback
end
rescue
operated_chunks.unshift(first_chunk)
chunk.rollback
raise
end
end
return if stored

errors = []
# Buffer plugin estimates there's no serious error cause: will commit for all chunks eigher way
operated_chunks.each do |chunk|
begin
chunk.commit
enqueue_chunk(chunk.metadata) if enqueue || chunk_size_full?(chunk)
chunk.mon_exit
rescue => e
chunk.rollback
chunk.mon_exit
errors << e
# try step-by-step appending if data can't be stored into existing a chunk
emit_step_by_step(metadata, data)
end

def emit_bulk(metadata, bulk, size)
return if bulk.nil? || bulk.empty?
raise BufferOverflowError unless storable?

stored = false
synchronize do # critical section for buffer (stage/queue)
until stored
chunk = @stage[metadata]
unless chunk
chunk = @stage[metadata] = generate_chunk(metadata)
end
end

@stage_size += staged_bytesize
chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
begin
empty_chunk = chunk.empty?
chunk.concat(bulk, size)

if chunk_size_over?(chunk)
if empty_chunk
log.warn "chunk bytes limit exceeds for a bulk event stream: #{bulk.bytesize}bytes"
else
chunk.rollback
enqueue_chunk(metadata)
next
end
end

if errors.size > 0
log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class)
raise errors.first
end
rescue
operated_chunks.each do |chunk|
chunk.rollback rescue nil # nothing possible to do for #rollback failure
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
chunk.commit
stored = true
@stage_size += bulk.bytesize
if chunk_size_full?(chunk)
enqueue_chunk(metadata)
end
rescue
chunk.rollback
raise
end
end
end
raise
end
nil
end

def queued_records
Expand Down Expand Up @@ -352,118 +358,64 @@ def chunk_size_over?(chunk)
end

def chunk_size_full?(chunk)
chunk.bytesize >= @chunk_bytes_limit * @chunk_full_threshold || (@chunk_records_limit && chunk.size >= @chunk_records_limit * @chunk_full_threshold)
chunk.bytesize >= @chunk_bytes_limit || (@chunk_records_limit && chunk.size >= @chunk_records_limit)
end

class ShouldRetry < StandardError; end

def write_once(metadata, data, bulk: false, &block)
return if !bulk && (data.nil? || data.empty?)
return if bulk && (data.empty? || data.first.nil? || data.first.empty?)
def emit_step_by_step(metadata, data)
attempt_records = data.size / 3

stored = false
adding_bytesize = nil

chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }

chunk.synchronize do
# retry this method if chunk is already queued (between getting chunk and entering critical section)
raise ShouldRetry unless chunk.staged?

empty_chunk = chunk.empty?

original_bytesize = chunk.bytesize
begin
if bulk
content, size = data
chunk.concat(content, size)
else
chunk.append(data)
synchronize do # critical section for buffer (stage/queue)
while data.size > 0
if attempt_records < MINIMUM_APPEND_ATTEMPT_RECORDS
attempt_records = MINIMUM_APPEND_ATTEMPT_RECORDS
end
adding_bytesize = chunk.bytesize - original_bytesize

if chunk_size_over?(chunk)
if empty_chunk && bulk
log.warn "chunk bytes limit exceeds for a bulk event stream: #{bulk.bytesize}bytes"
stored = true
else
chunk.rollback
end
else
stored = true
chunk = @stage[metadata]
unless chunk
chunk = @stage[metadata] = generate_chunk(metadata)
end
rescue
chunk.rollback
raise
end

if stored
block.call(chunk, adding_bytesize)
elsif bulk
# this metadata might be enqueued already by other threads
# but #enqueue_chunk does nothing in such case
enqueue_chunk(metadata)
raise ShouldRetry
end
end

unless stored
# try step-by-step appending if data can't be stored into existing a chunk in non-bulk mode
write_step_by_step(metadata, data, data.size / 3, &block)
end
rescue ShouldRetry
retry
end

def write_step_by_step(metadata, data, attempt_records, &block)
while data.size > 0
if attempt_records < MINIMUM_APPEND_ATTEMPT_RECORDS
attempt_records = MINIMUM_APPEND_ATTEMPT_RECORDS
end

chunk = synchronize{ @stage[metadata] ||= generate_chunk(metadata) }
chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
raise ShouldRetry unless chunk.staged?
begin
empty_chunk = chunk.empty?
original_bytesize = chunk.bytesize

attempt = data.slice(0, attempt_records)
chunk.append(attempt)
adding_bytesize = (chunk.bytesize - original_bytesize)

if chunk_size_over?(chunk)
chunk.rollback

if attempt_records <= MINIMUM_APPEND_ATTEMPT_RECORDS
if empty_chunk # record is too large even for empty chunk
raise BufferChunkOverflowError, "minimum append butch exceeds chunk bytes limit"
chunk.synchronize do # critical section for chunk (chunk append/commit/rollback)
begin
empty_chunk = chunk.empty?
original_bytesize = chunk.bytesize

attempt = data.slice(0, attempt_records)
chunk.append(attempt)

if chunk_size_over?(chunk)
chunk.rollback

if attempt_records <= MINIMUM_APPEND_ATTEMPT_RECORDS
if empty_chunk # record is too large even for empty chunk
raise BufferChunkOverflowError, "minimum append butch exceeds chunk bytes limit"
end
# no more records for this chunk -> enqueue -> to be flushed
enqueue_chunk(metadata) # `chunk` will be removed from stage
attempt_records = data.size # fresh chunk may have enough space
else
# whole data can be processed by twice operation
# ( by using apttempt /= 2, 3 operations required for odd numbers of data)
attempt_records = (attempt_records / 2) + 1
end
# no more records for this chunk -> enqueue -> to be flushed
enqueue_chunk(metadata) # `chunk` will be removed from stage
attempt_records = data.size # fresh chunk may have enough space
else
# whole data can be processed by twice operation
# ( by using apttempt /= 2, 3 operations required for odd numbers of data)
attempt_records = (attempt_records / 2) + 1

next
end

next
chunk.commit
@stage_size += (chunk.bytesize - original_bytesize)
data.slice!(0, attempt_records)
# same attempt size
nil # discard return value of data.slice!() immediately
rescue
chunk.rollback
raise
end

block.call(chunk, adding_bytesize)
data.slice!(0, attempt_records)
# same attempt size
nil # discard return value of data.slice!() immediately
rescue
chunk.rollback
raise
end
end
end
rescue ShouldRetry
retry
end # write_step_by_step
nil
end # emit_step_by_step
end
end
end
26 changes: 5 additions & 21 deletions lib/fluent/plugin/buffer/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,12 @@ def initialize(metadata)
@unique_id = generate_unique_id
@metadata = metadata

# state: staged/queued/closed
@state = :staged

@size = 0
@created_at = Time.now
@modified_at = Time.now
end

attr_reader :unique_id, :metadata, :created_at, :modified_at, :state
attr_reader :unique_id, :metadata, :created_at, :modified_at

# data is array of formatted record string
def append(data)
Expand Down Expand Up @@ -92,28 +89,15 @@ def empty?
size == 0
end

def staged?
@state == :staged
end

def queued?
@state == :queued
end

def closed?
@state == :closed
end

def enqueued!
@state = :queued
end
## method for post-process of enqueue (e.g., renaming file for file chunks)
# def enqueued!

def close
@state = :closed
raise NotImplementedError, "Implement this method in child class"
end

def purge
@state = :closed
raise NotImplementedError, "Implement this method in child class"
end

def read
Expand Down
Loading