Skip to content

Commit

Permalink
Merge pull request #1374 from fluent/root-directory-per-worker-process
Browse files Browse the repository at this point in the history
Root directory per worker process
  • Loading branch information
tagomoris authored Dec 20, 2016
2 parents 12c740c + 801c5e4 commit 934ddf9
Show file tree
Hide file tree
Showing 21 changed files with 720 additions and 142 deletions.
9 changes: 2 additions & 7 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
# limitations under the License.
#

require 'socket'

require 'cool.io'

require 'fluent/config'
require 'fluent/event'
require 'fluent/event_router'
Expand Down Expand Up @@ -58,8 +54,6 @@ def initialize
def init(system_config)
@system_config = system_config

BasicSocket.do_not_reverse_lookup = true

suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
Expand Down Expand Up @@ -172,7 +166,8 @@ def log_event_loop

def run
begin
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid # TODO: worker number
worker_id = ENV['SERVERENGINE_WORKER_ID']
$log.info "starting fluentd worker", pid: Process.pid, ppid: Process.ppid, worker: worker_id
start

if @event_router.match?($log.tag)
Expand Down
5 changes: 0 additions & 5 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,6 @@ def configure(conf)
end
end

def start
@log.reset
super
end

def terminate
super
@log.reset
Expand Down
11 changes: 11 additions & 0 deletions lib/fluent/plugin/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,24 @@ def initialize
super
@_state = State.new(false, false, false, false, false, false, false, false, false)
@_context_router = nil
@_fluentd_worker_id = nil
@under_plugin_development = false
end

def has_router?
false
end

def plugin_root_dir
nil # override this in plugin_id.rb
end

def fluentd_worker_id
return @_fluentd_worker_id if @_fluentd_worker_id
@_fluentd_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
@_fluentd_worker_id
end

def configure(conf)
super
@_state ||= State.new(false, false, false, false, false, false, false, false, false)
Expand Down
16 changes: 9 additions & 7 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,15 @@ class FileBuffer < Fluent::Plugin::Buffer

DIR_PERMISSION = 0755

# TODO: buffer_path based on system config
desc 'The path where buffer chunks are stored.'
config_param :path, :string
config_param :path, :string, default: nil

config_set_default :chunk_limit_size, DEFAULT_CHUNK_LIMIT_SIZE
config_set_default :total_limit_size, DEFAULT_TOTAL_LIMIT_SIZE

config_param :file_permission, :string, default: nil # '0644'
config_param :dir_permission, :string, default: nil # '0755'

##TODO: Buffer plugin cannot handle symlinks because new API @stage has many writing buffer chunks
## re-implement this feature on out_file, w/ enqueue_chunk(or generate_chunk) hook + chunk.path
# attr_accessor :symlink_path

@@buffer_paths = {}

def initialize
Expand All @@ -56,6 +51,14 @@ def initialize
def configure(conf)
super

unless @path
if root_dir = owner.plugin_root_dir
@path = File.join(root_dir, 'buffer')
else
raise Fluent::ConfigError, "buffer path is not configured. specify 'path' in <buffer>"
end
end

type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
if @@buffer_paths.has_key?(@path) && !buffer_path_for_test?
type_using_this_path = @@buffer_paths[@path]
Expand All @@ -64,7 +67,6 @@ def configure(conf)

@@buffer_paths[@path] = type_of_owner

# TODO: create buffer path with plugin_id, under directory specified by system config
if File.exist?(@path)
if File.directory?(@path)
@path = File.join(@path, 'buffer.*.log')
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def configure(conf)
raise Fluent::ConfigError, "tail: 'path' parameter is required on tail input"
end

# TODO: Use plugin_root_dir and storage plugin to store positions if available
unless @pos_file
$log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
$log.warn "this parameter is highly recommended to save the position to resume tailing."
Expand Down
29 changes: 16 additions & 13 deletions lib/fluent/plugin/storage_local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ class LocalStorage < Storage
DEFAULT_FILE_MODE = 0644

config_param :path, :string, default: nil
config_param :mode, :integer, default: DEFAULT_FILE_MODE
config_param :dir_mode, :integer, default: DEFAULT_DIR_MODE
config_param :mode, default: DEFAULT_FILE_MODE do |v|
v.to_i(8)
end
config_param :dir_mode, default: DEFAULT_DIR_MODE do |v|
v.to_i(8)
end
config_param :pretty_print, :bool, default: false

attr_reader :store # for test

def initialize
super
@store = {}
Expand All @@ -42,9 +48,13 @@ def configure(conf)
super

@on_memory = false
if !@path && !@_plugin_id_configured
if @path
# use it
elsif root_dir = owner.plugin_root_dir
@path = File.join(root_dir, 'storage.json')
else
if @persistent
raise Fluent::ConfigError, "Plugin @id or path for <storage> required to save data"
raise Fluent::ConfigError, "Plugin @id or path for <storage> required when 'persistent' is true"
else
if @autosave
log.warn "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
Expand All @@ -53,18 +63,11 @@ def configure(conf)
end
@on_memory = true
end
elsif @path
# ok
else # @_plugin_id_configured is true
log.warn "path for <storage> is not specified. Using on-memory store temporarily, but will use file store after support global storage path"
@on_memory = true
## TODO: get process-wide directory for plugin storage, and generate path for this plugin storage instance
# path =
end

if !@on_memory
dir = File.dirname(@path)
FileUtils.mkdir_p(dir, mode: @dir_mode) unless File.exist?(dir)
FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
if File.exist?(@path)
raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
begin
Expand All @@ -75,7 +78,7 @@ def configure(conf)
raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin storage file: '#{@path}'"
end
else
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{dir}'" unless File.writable?(dir)
raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
end
end
end
Expand Down
30 changes: 21 additions & 9 deletions lib/fluent/plugin_helper/storage.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ module Storage
StorageState = Struct.new(:storage, :running)

def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
if conf && !conf.arg.empty?
usage = conf.arg
end

s = @_storages[usage]
if s && s.running
return s.storage
Expand Down Expand Up @@ -72,7 +76,7 @@ def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
module StorageParams
include Fluent::Configurable
# minimum section definition to instantiate storage plugin instances
config_section :storage, required: false, multi: true, param_name: :storage_configs do
config_section :storage, required: false, multi: true, param_name: :storage_configs, init: true do
config_argument :usage, :string, default: ''
config_param :@type, :string, default: Fluent::Plugin::Storage::DEFAULT_TYPE
end
Expand Down Expand Up @@ -194,6 +198,10 @@ def initialize(storage)
def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate
def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated?

def method_missing(name, *args)
@monitor.synchronize{ @storage.__send__(name, *args) }
end

def persistent_always?
true
end
Expand Down Expand Up @@ -274,14 +282,18 @@ class SynchronizeWrapper

def initialize(storage)
@storage = storage
@mutex = Mutex.new
@monitor = Monitor.new
end

def_delegators :@storage, :persistent, :autosave, :autosave_interval, :save_at_shutdown
def_delegators :@storage, :persistent_always?
def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate
def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated?

def method_missing(name, *args)
@monitor.synchronize{ @storage.__send__(name, *args) }
end

def synchronized?
true
end
Expand All @@ -291,35 +303,35 @@ def implementation
end

def load
@mutex.synchronize do
@monitor.synchronize do
@storage.load
end
end

def save
@mutex.synchronize do
@monitor.synchronize do
@storage.save
end
end

def get(key)
@mutex.synchronize{ @storage.get(key) }
@monitor.synchronize{ @storage.get(key) }
end

def fetch(key, defval)
@mutex.synchronize{ @storage.fetch(key, defval) }
@monitor.synchronize{ @storage.fetch(key, defval) }
end

def put(key, value)
@mutex.synchronize{ @storage.put(key, value) }
@monitor.synchronize{ @storage.put(key, value) }
end

def delete(key)
@mutex.synchronize{ @storage.delete(key) }
@monitor.synchronize{ @storage.delete(key) }
end

def update(key, &block)
@mutex.synchronize do
@monitor.synchronize do
v = block.call(@storage.get(key))
@storage.put(key, v)
v
Expand Down
17 changes: 17 additions & 0 deletions lib/fluent/plugin_id.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ module Fluent
module PluginId
@@configured_ids = Set.new

def initialize
super
@_plugin_root_dir = nil
end

def configure(conf)
@id = conf['@id']
@_id_configured = !!@id # plugin id is explicitly configured by users (or not)
Expand Down Expand Up @@ -59,5 +64,17 @@ def plugin_id
"object:#{object_id.to_s(16)}"
end
end

def plugin_root_dir
return @_plugin_root_dir if @_plugin_root_dir
return nil unless system_config.root_dir
return nil unless plugin_id_configured?

# Fluent::Plugin::Base#fluentd_worker_id
dir = File.join(system_config.root_dir, "worker#{fluentd_worker_id}", plugin_id)
FileUtils.mkdir_p(dir) unless Dir.exist?(dir)
@_plugin_root_dir = dir.freeze
dir
end
end
end
27 changes: 25 additions & 2 deletions lib/fluent/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'etc'
require 'fcntl'
require 'fileutils'

require 'fluent/config'
require 'fluent/env'
Expand Down Expand Up @@ -192,7 +193,10 @@ def supervisor_get_dump_config_handler
module WorkerModule
def spawn(process_manager)
main_cmd = config[:main_cmd]
@pm = process_manager.spawn(*main_cmd)
env = {
'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s,
}
@pm = process_manager.spawn(env, *main_cmd)
end

def after_start
Expand Down Expand Up @@ -226,6 +230,7 @@ def self.load_config(path, params = {})
fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
system_config = SystemConfig.create(fluentd_conf)

root_dir = system_config.root_dir || params['root_dir']
log_level = system_config.log_level || params['log_level']
suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace']
log_path = params['log_path']
Expand Down Expand Up @@ -264,6 +269,7 @@ def self.load_config(path, params = {})
auto_heartbeat: false,
unrecoverable_exit_codes: [2],
stop_immediately_at_unrecoverable_exit: true,
root_dir: root_dir,
logger: logger,
log: logger.out,
log_path: log_path,
Expand Down Expand Up @@ -365,6 +371,7 @@ def self.default_options
setup_path: nil,
chuser: nil,
chgroup: nil,
root_dir: nil,
suppress_interval: 0,
suppress_repeated_stacktrace: true,
without_source: false,
Expand Down Expand Up @@ -393,6 +400,7 @@ def initialize(opt)
@rpc_server = nil
@process_name = nil

@root_dir = opt[:root_dir]
@log_level = opt[:log_level]
@log_rotate_age = opt[:log_rotate_age]
@log_rotate_size = opt[:log_rotate_size]
Expand All @@ -417,6 +425,20 @@ def run_supervisor
read_config
set_system_config

if @root_dir
if File.exist?(@root_dir)
unless Dir.exist?(@root_dir)
raise Fluent::InvalidRootDirectory, "non directory entry exists:#{@root_dir}"
end
else
begin
FileUtils.mkdir_p(@root_dir)
rescue => e
raise Fluent::InvalidRootDirectory, "failed to create root directory:#{@root_dir}, #{e.inspect}"
end
end
end

dry_run if @dry_run
supervise
end
Expand All @@ -426,7 +448,8 @@ def options
'config_path' => @config_path,
'pid_file' => @daemonize,
'plugin_dirs' => @plugin_dirs,
'log_path' => @log_path
'log_path' => @log_path,
'root_dir' => @root_dir,
}
end

Expand Down
Loading

0 comments on commit 934ddf9

Please sign in to comment.