From 132ba2d1dd8f33e4ac98bc35cb543bfc7b5fe062 Mon Sep 17 00:00:00 2001 From: ganmacs Date: Wed, 28 Sep 2016 19:54:00 +0900 Subject: [PATCH] now --- example/counter.conf | 2 +- lib/fluent/counter/client.rb | 14 +++++++++----- lib/fluent/counter/server.rb | 4 +++- lib/fluent/counter/store.rb | 22 ++++++++++++++++++---- lib/fluent/plugin/in_dummy.rb | 5 +++++ test/counter/test_server.rb | 4 ++-- 6 files changed, 38 insertions(+), 13 deletions(-) diff --git a/example/counter.conf b/example/counter.conf index b7072d23e0..ff5958fc04 100644 --- a/example/counter.conf +++ b/example/counter.conf @@ -1,7 +1,7 @@ scope server1 - endpoint 127.0.0.1 + endpoint 127.0.0.1:4321 path tmp/back diff --git a/lib/fluent/counter/client.rb b/lib/fluent/counter/client.rb index 60e1f5ca9f..79c58eec33 100644 --- a/lib/fluent/counter/client.rb +++ b/lib/fluent/counter/client.rb @@ -15,6 +15,7 @@ # require 'cool.io' +require 'timeout' require 'fluent/counter/base_socket' module Fluent @@ -24,8 +25,8 @@ class Client DEFAULT_HOST = '127.0.0.1' ID_LIMIT_COUNT = 1 << 31 - def initialize(loop = Coolio::Loop.new, opt = {}) - @loop = loop + def initialize(loop = nil, opt = {}) + @loop = loop || Coolio::Loop.new @port = opt[:port] || DEFAULT_PORT @host = opt[:host] || DEFAULT_HOST @log = opt[:log] || $log @@ -207,15 +208,18 @@ def data def get # Block until `set` method is called and @result is set a value join if @result.nil? + @result end private def join - until @set - @mutex.synchronize do - @loop.run_once(0.0001) # retun a lock as soon as possible + Timeout.timeout(5) do + until @set + @mutex.synchronize do + @loop.run_once(0.0001) # retun a lock as soon as possible + end end end end diff --git a/lib/fluent/counter/server.rb b/lib/fluent/counter/server.rb index d68bf1d106..f4878b4ead 100644 --- a/lib/fluent/counter/server.rb +++ b/lib/fluent/counter/server.rb @@ -38,7 +38,7 @@ def initialize(name, opt = {}) @store = Fluent::Counter::Store.new(opt) @mutex_hash = MutexHash.new(@store) - @server = Coolio::TCPServer.new(@host, @port, Handler, method(:on_message)) + @server = Coolio::TCPServer.new(@host, @port.to_i, Handler, method(:on_message)) @thread = nil @running = false end @@ -50,6 +50,7 @@ def start @loop.run(0.5) @running = false end + @log.debug('Start Counter Server') @mutex_hash.start self end @@ -61,6 +62,7 @@ def stop @loop.stop if @running @mutex_hash.stop @thread.join if @thread + @log.debug('Stop Counter Server') end def on_message(data) diff --git a/lib/fluent/counter/store.rb b/lib/fluent/counter/store.rb index 9e18ca4183..fdc5ce70b4 100644 --- a/lib/fluent/counter/store.rb +++ b/lib/fluent/counter/store.rb @@ -25,12 +25,26 @@ def self.gen_key(scope, key) "#{scope}\t#{key}" end + attr_reader :log + def initialize(opt = {}) # Notice: This storage is not be implemented auto save. - @storage = Fluent::Plugin::LocalStorage.new - @storage.configure( - Fluent::Config::Element.new('storage', {}, {'persistent' => true, 'path' => opt[:path] }, []) - ) + @log = opt[:log] || $log + @storage = Plugin.new_storage('local', parent: self) + conf = if opt[:path] + {'persistent' => true, 'path' => opt[:path] } + else + {'persistent' => false } + end + @storage.configure(Fluent::Config::Element.new('storage', {}, conf, [])) + end + + def plugin_id + 'hoge' + end + + def plugin_id_configured? + false end def start diff --git a/lib/fluent/plugin/in_dummy.rb b/lib/fluent/plugin/in_dummy.rb index c69c28fcea..da8f039c71 100644 --- a/lib/fluent/plugin/in_dummy.rb +++ b/lib/fluent/plugin/in_dummy.rb @@ -18,6 +18,7 @@ require 'fluent/plugin/input' require 'fluent/config/error' +require 'fluent/counter' module Fluent::Plugin class DummyInput < Input @@ -58,6 +59,9 @@ class DummyInput < Input def initialize super @storage = nil + + @c = Fluent::Counter::Client.new(nil, { port: 4321, host: '127.0.0.1' }) + @c.establish("hoge") end def configure(conf) @@ -102,6 +106,7 @@ def emit(num) if @size > 1 num.times do router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] }) + # @c.inc({ name: 'foo', value: 1 }) end else num.times { router.emit(@tag, Fluent::Engine.now, generate) } diff --git a/test/counter/test_server.rb b/test/counter/test_server.rb index f3c81eae75..ec43fd0977 100644 --- a/test/counter/test_server.rb +++ b/test/counter/test_server.rb @@ -14,7 +14,7 @@ class CounterServerTest < ::Test::Unit::TestCase @scope = "server\tworker\tplugin" @server_name = 'server1' - @server = Fluent::Counter::Server.new(@server_name, opt: { log: $log }) + @server = Fluent::Counter::Server.new(@server_name, opt: { log: $log, path: 'file/path' }) @server.instance_eval { @server.close } end @@ -30,7 +30,7 @@ def extract_value_from_counter(counter, scope, name) test 'raise an error when server name is invalid' do assert_raise do - Fluent::Counter::Server.new("\tinvalid_name") + Fluent::Counter::Server.new("\tinvalid_name", opt: { path: 'file/path' }) end end