Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plugin development enhancement #1256

Merged
merged 5 commits into from
Oct 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ class Base

State = Struct.new(:configure, :start, :after_start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)

attr_accessor :under_plugin_development

def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false, false)
@under_plugin_development = false
end

def has_router?
Expand Down
7 changes: 7 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ def metadata_list
end
end

# it's too dangerous, and use it so carefully to remove metadata for tests
def metadata_list_clear!
synchronize do
@metadata_list.clear
end
end

def new_metadata(timekey: nil, tag: nil, variables: nil)
Metadata.new(timekey, tag, variables)
end
Expand Down
12 changes: 12 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,13 @@ def metadata(tag, time, record)
end
end

def metadata_for_test(tag, time, record)
raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty?
m = metadata(tag, time, record)
@buffer.metadata_list_clear!
m
end

def execute_chunking(tag, es, enqueue: false)
if @simple_chunking
handle_stream_simple(tag, es, enqueue: enqueue)
Expand Down Expand Up @@ -976,6 +983,10 @@ def try_flush
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
@buffer.takeback_chunk(chunk.unique_id)

if @under_plugin_development
raise
end

@retry_mutex.synchronize do
if @retry
@counters_monitor.synchronize{ @num_errors += 1 }
Expand Down Expand Up @@ -1120,6 +1131,7 @@ def enqueue_thread_run
@buffer.enqueue_all{ |metadata, chunk| metadata.timekey < current_timekey && metadata.timekey + timekey_unit + timekey_wait <= now_int }
end
rescue => e
raise if @under_plugin_development
log.error "unexpected error while checking flushed chunks. ignored.", plugin_id: plugin_id, error_class: e.class, error: e
log.error_backtrace
end
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def initialize(klass, opts: {}, &block)
else
@instance = klass
end
@instance.under_plugin_development = true

@logs = []

Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/test/driver/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'fluent/test/driver/event_feeder'

require 'fluent/plugin/output'
require 'timeout'

module Fluent
module Test
Expand All @@ -43,6 +44,7 @@ def run_actual(**kwargs, &block)
super(**kwargs, &block)
if @flush_buffer_at_cleanup
@instance.force_flush
Timeout.timeout(10){ sleep 0.1 until [email protected] || @instance.buffer.queue.size == 0 }
end
end

Expand All @@ -52,6 +54,7 @@ def formatted

def flush
@instance.force_flush
Timeout.timeout(10){ sleep 0.1 until [email protected] || @instance.buffer.queue.size == 0 }
end

def instance_hook_after_started
Expand Down