Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make engine lighter #2667

Merged
merged 14 commits into from
Oct 29, 2019
98 changes: 12 additions & 86 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,28 @@
require 'fluent/time'
require 'fluent/system_config'
require 'fluent/plugin'
require 'fluent/fluent_log_event_router'

module Fluent
class EngineClass
def initialize
@root_agent = nil
@default_loop = nil
@engine_stopped = false
@_worker_id = nil

@log_event_router = nil
@log_emit_thread = nil
@log_event_loop_stop = false
@log_event_loop_graceful_stop = false
@log_event_queue = []
@log_event_verbose = false

@suppress_config_dump = false
@without_source = false

@fluent_log_event_router = nil
@system_config = SystemConfig.new

@dry_run_mode = false
end

MAINLOOP_SLEEP_INTERVAL = 0.3

MATCH_CACHE_SIZE = 1024
LOG_EMIT_INTERVAL = 0.1

attr_reader :root_agent
attr_reader :matches, :sources
attr_reader :system_config

attr_reader :root_agent, :system_config
attr_accessor :dry_run_mode

def init(system_config)
Expand Down Expand Up @@ -113,43 +103,14 @@ def run_configure(conf)
end

def configure(conf)
# plugins / configuration dumps
Gem::Specification.find_all.select{|x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/}.each do |spec|
$log.info :worker0, "gem '#{spec.name}' version '#{spec.version}'"
end

@root_agent.configure(conf)

begin
log_event_agent = @root_agent.find_label(Fluent::Log::LOG_EVENT_LABEL)
log_event_router = log_event_agent.event_router

# suppress mismatched tags only for <label @FLUENT_LOG> label.
# it's not suppressed in default event router for non-log-event events
log_event_router.suppress_missing_match!
@fleunt_log_event_router = FluentLogEventRouter.build(@root_agent)

@log_event_router = log_event_router

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
log_event_router = @root_agent.event_router

if Fluent::Log.event_tags.any?{|t| log_event_router.match?(t) }
@log_event_router = log_event_router

unmatched_tags = Fluent::Log.event_tags.select{|t| !@log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
if @fleunt_log_event_router.emittable?
$log.enable_event(true)
end

$log.enable_event(true) if @log_event_router

unless @suppress_config_dump
$log.info :supervisor, "using configuration file: #{conf.to_s.rstrip}"
end
Expand Down Expand Up @@ -180,39 +141,12 @@ def now
Fluent::EventTime.now
end

def log_event_loop
$log.disable_events(Thread.current)

while sleep(LOG_EMIT_INTERVAL)
break if @log_event_loop_stop
break if @log_event_loop_graceful_stop && @log_event_queue.empty?
next if @log_event_queue.empty?

# NOTE: thead-safe of slice! depends on GVL
events = @log_event_queue.slice!(0..-1)
next if events.empty?

events.each {|tag,time,record|
begin
@log_event_router.emit(tag, time, record)
rescue => e
# This $log.error doesn't emit log events, because of `$log.disable_events(Thread.current)` above
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
end
}
end
end

def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start

if @log_event_router
$log.enable_event(true)
@log_emit_thread = Thread.new(&method(:log_event_loop))
@log_emit_thread.abort_on_exception = true
end
@fleunt_log_event_router.start

$log.info "fluentd worker is now running", worker: worker_id
sleep MAINLOOP_SLEEP_INTERVAL until @engine_stopped
Expand All @@ -226,19 +160,12 @@ def run

unless @log_event_verbose
$log.enable_event(false)
if @log_emit_thread
# to make sure to emit all log events into router, before shutting down
@log_event_loop_graceful_stop = true
@log_emit_thread.join
@log_emit_thread = nil
end
@fleunt_log_event_router.graceful_stop
end
$log.info "shutting down fluentd worker", worker: worker_id
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end

@fleunt_log_event_router.stop
end

def stop
Expand All @@ -247,8 +174,7 @@ def stop
end

def push_log_event(tag, time, record)
return if @log_emit_thread.nil?
@log_event_queue.push([tag, time, record])
@fleunt_log_event_router.emit_event([tag, time, record])
end

def worker_id
Expand Down
137 changes: 137 additions & 0 deletions lib/fluent/fluent_log_event_router.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/log'

module Fluent
# DO NOT write any logic here
class NullFluentLogEventRouter
def start; end

def stop; end

def graceful_stop; end

def emit_event(_event); end

def emittable?
self.class != NullFluentLogEventRouter
end
end

# This class is for handling fluentd's inner log
# e.g. <label @FLUNT_LOG> section and <match fluent.**> section
class FluentLogEventRouter < NullFluentLogEventRouter
# @param root_agent [Fluent::RootAgent]
def self.build(root_agent)
log_event_router = nil

begin
log_event_agent = root_agent.find_label(Fluent::Log::LOG_EVENT_LABEL)
log_event_router = log_event_agent.event_router

# suppress mismatched tags only for <label @FLUENT_LOG> label.
# it's not suppressed in default event router for non-log-event events
log_event_router.suppress_missing_match!

log_event_router = log_event_router

unmatched_tags = Fluent::Log.event_tags.select { |t| !log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end

rescue ArgumentError # ArgumentError "#{label_name} label not found"
# use default event router if <label @FLUENT_LOG> is missing in configuration
root_log_event_router = root_agent.event_router

if Fluent::Log.event_tags.any? { |t| root_log_event_router.match?(t) }
log_event_router = root_log_event_router

unmatched_tags = Fluent::Log.event_tags.select { |t| !log_event_router.match?(t) }
unless unmatched_tags.empty?
$log.warn "match for some tags of log events are not defined (to be ignored)", tags: unmatched_tags
end
end
end

if log_event_router
FluentLogEventRouter.new(log_event_router)
else
$log.debug('No fluent logger for internal event')
NullFluentLogEventRouter.new
end
end

STOP = :stop
GRACEFUL_STOP = :graceful_stop

# @param event_router [Fluent::EventRouter]
def initialize(event_router)
@event_router = event_router
@thread = nil
@graceful_stop = false
@event_queue = Queue.new
end

def start
@thread = Thread.new do
$log.disable_events(Thread.current)

loop do
event = @event_queue.pop

case event
when GRACEFUL_STOP
@graceful_stop = true
when STOP
break
else
begin
tag, time, record = event
@event_router.emit(tag, time, record)
rescue => e
# This $log.error doesn't emit log events, because of `$log.disable_events(Thread.current)` above
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error: e
end
end

if @graceful_stop && @event_queue.empty?
break
end
end
end

@thread.abort_on_exception = true
end

def stop
@event_queue.push(STOP)
# there is no problem calling Thread#join multiple times.
@thread && @thread.join
end

def graceful_stop
# to make sure to emit all log events into router, before shutting down
@event_queue.push(GRACEFUL_STOP)
@thread && @thread.join
end

def emit_event(event)
@event_queue.push(event)
end
end
end
11 changes: 9 additions & 2 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def dry_run
Fluent::Engine.dry_run_mode = true
change_privilege
MessagePackFactory.init
init_engine
init_engine(supervisor: true)
run_configure
rescue Fluent::ConfigError => e
$log.error "config error", file: @config_path, error: e
Expand Down Expand Up @@ -794,7 +794,7 @@ def change_privilege
ServerEngine::Privilege.change(@chuser, @chgroup)
end

def init_engine
def init_engine(supervisor: false)
Fluent::Engine.init(@system_config)

@libs.each {|lib|
Expand All @@ -807,6 +807,13 @@ def init_engine
Fluent::Engine.add_plugin_dir(dir)
end
}

if supervisor
# plugins / configuration dumps
Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
$log.info("gem '#{spec.name}' version '#{spec.version}'")
end
end
end

def run_configure
Expand Down
Loading