Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fluent::Plugin::Base#after_start to detect end of #start #1190

Merged
merged 3 commits into from
Aug 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ class Base
include Configurable
include SystemConfig::Mixin

State = Struct.new(:configure, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)
State = Struct.new(:configure, :start, :after_start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate)

def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false)
@_state = State.new(false, false, false, false, false, false, false, false, false)
end

def has_router?
Expand All @@ -37,7 +37,7 @@ def has_router?

def configure(conf)
super
@_state ||= State.new(false, false, false, false, false, false, false, false)
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
@_state.configure = true
self
end
Expand All @@ -47,6 +47,11 @@ def start
self
end

def after_start
@_state.after_start = true
self
end

def stop
@_state.stop = true
self
Expand Down Expand Up @@ -85,6 +90,10 @@ def started?
@_state.start
end

def after_started?
@_state.after_start
end

def stopped?
@_state.stop
end
Expand Down
15 changes: 15 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ def start
@secondary.start if @secondary
end

def after_start
super
@secondary.after_start if @secondary
end

def stop
@secondary.stop if @secondary
@buffer.stop if @buffering && @buffer
Expand Down Expand Up @@ -922,6 +927,11 @@ def enqueue_thread_run
interval = @buffer_config.flush_thread_interval
end

while !self.after_started? && !self.stopped?
sleep 0.5
end
log.debug "enqueue_thread actually running"

begin
while @output_flush_threads_running
now_int = Time.now.to_i
Expand Down Expand Up @@ -969,6 +979,11 @@ def flush_thread_run(state)
clock_id = Process::CLOCK_MONOTONIC rescue Process::CLOCK_MONOTONIC_RAW
state.next_time = Process.clock_gettime(clock_id) + flush_thread_interval

while !self.after_started? && !self.stopped?
sleep 0.5
end
log.debug "flush_thread actually running"

begin
# This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
while @output_flush_threads_running
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def start
lifecycle(desc: true) do |i| # instance
i.start unless i.started?
end
lifecycle(desc: true) do |i|
i.after_start unless i.after_started?
end
end

def flush!
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/test/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def configure(str, use_v1 = false)
# num_waits is for checking thread status. This will be removed after improved plugin API
def run(num_waits = 10, &block)
@instance.start
@instance.after_start
begin
# wait until thread starts
num_waits.times { sleep 0.05 }
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/test/driver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def instance_start
@instance.start
instance_hook_after_started
end
unless @instance.after_started?
@instance.after_start
end
end

def instance_hook_after_started
Expand Down
12 changes: 12 additions & 0 deletions test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ def waiting(seconds)
assert [email protected]?
@i.start
assert @i.started?
assert [email protected]_started?
@i.after_start
assert @i.after_started?
assert [email protected]?
@i.stop
assert @i.stopped?
Expand Down Expand Up @@ -329,6 +332,7 @@ def waiting(seconds)
i.register(:process){|tag, es| process_called = true }
i.configure(config_element())
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -344,6 +348,7 @@ def waiting(seconds)
i.register(:format){|tag, time, record| format_called_times += 1; '' }
i.configure(config_element())
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -364,6 +369,7 @@ def waiting(seconds)
i.configure(config_element())
i.register(:prefer_buffered_processing){ false } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert !i.prefer_buffered_processing

Expand All @@ -389,6 +395,7 @@ def waiting(seconds)
i.configure(config_element())
i.register(:prefer_buffered_processing){ true } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert i.prefer_buffered_processing

Expand All @@ -408,6 +415,7 @@ def waiting(seconds)

i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -427,6 +435,7 @@ def waiting(seconds)

i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.start
i.after_start

t = event_time()
i.emit_events('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]))
Expand All @@ -449,6 +458,7 @@ def waiting(seconds)
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.register(:prefer_delayed_commit){ false } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert !i.prefer_delayed_commit

Expand All @@ -474,6 +484,7 @@ def waiting(seconds)
i.configure(config_element('ROOT', '', {}, [config_element('buffer', '', {"flush_mode" => "immediate"})]))
i.register(:prefer_delayed_commit){ true } # delayed decision is possible to change after (output's) configure
i.start
i.after_start

assert i.prefer_delayed_commit

Expand Down Expand Up @@ -542,6 +553,7 @@ def waiting(seconds)
@i.register(:process){|tag, es| ary << [tag, es] }
@i.configure(config_element())
@i.start
@i.after_start

t = event_time()
es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])
Expand Down
13 changes: 13 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def waiting(seconds)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start
@i.after_start
assert{ logs.select{|log| log.include?('[warn]') }.size == 0 }
end

Expand All @@ -154,6 +155,7 @@ def waiting(seconds)
logs = @i.log.out.logs.dup

@i.start # this calls `log.reset`... capturing logs about configure must be done before this line
@i.after_start
assert_equal ['key1', 'key2', 'key3', 'key4'], @i.chunk_keys

assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 }
Expand All @@ -164,6 +166,7 @@ def waiting(seconds)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start # this calls `log.reset`... capturing logs about configure must be done before this line
@i.after_start
assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 }
end

Expand All @@ -172,6 +175,7 @@ def waiting(seconds)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start
@i.after_start
assert{ logs.select{|log| log.include?('[warn]') }.size == 0 }
end
end
Expand All @@ -187,6 +191,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
@i.start
@i.after_start
end

test '#start does not create enqueue thread, but creates flush threads' do
Expand Down Expand Up @@ -289,6 +294,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
@i.start
@i.after_start
end

test '#start creates enqueue thread and flush threads' do
Expand Down Expand Up @@ -398,6 +404,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer','',hash)]))
@i.start
@i.after_start
end

test '#start does not create enqueue thread, but creates flush threads' do
Expand Down Expand Up @@ -491,6 +498,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test '#configure raises config error if timekey is not specified' do
Expand Down Expand Up @@ -706,6 +714,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test 'default flush_mode is set to :interval' do
Expand Down Expand Up @@ -922,6 +931,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test 'default flush_mode is set to :interval' do
Expand Down Expand Up @@ -1134,6 +1144,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start

assert_equal :interval, @i.instance_eval{ @flush_mode }
end
Expand All @@ -1150,6 +1161,7 @@ def waiting(seconds)
@i = create_output(:buffered)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start

assert_equal :lazy, @i.instance_eval{ @flush_mode }
end
Expand All @@ -1168,6 +1180,7 @@ def waiting(seconds)
@i = create_output(:delayed)
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.start
@i.after_start
end

test '#format is called for each event streams' do
Expand Down
3 changes: 3 additions & 0 deletions test/plugin/test_output_as_buffered_overflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def waiting(seconds)
@i = create_output()
@i.configure(config_element('ROOT','',{},[config_element('buffer','tag',hash)]))
@i.start
@i.after_start
end

test '#emit_events raises error when buffer is full' do
Expand Down Expand Up @@ -108,6 +109,7 @@ def waiting(seconds)
@i = create_output()
@i.configure(config_element('ROOT','',{'log_level' => 'debug'},[config_element('buffer','tag',hash)]))
@i.start
@i.after_start
end

test '#emit_events blocks until any queues are flushed' do
Expand Down Expand Up @@ -169,6 +171,7 @@ def waiting(seconds)
@i = create_output()
@i.configure(config_element('ROOT','',{'log_level' => 'debug'},[config_element('buffer','tag',hash)]))
@i.start
@i.after_start
end

test '#emit_events will success by dropping oldest chunk' do
Expand Down
Loading