Skip to content

Commit

Permalink
now
Browse files Browse the repository at this point in the history
  • Loading branch information
ganmacs committed Sep 28, 2016
1 parent f81c3ea commit 132ba2d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 13 deletions.
2 changes: 1 addition & 1 deletion example/counter.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<system>
<counter_server>
scope server1
endpoint 127.0.0.1
endpoint 127.0.0.1:4321
path tmp/back
</counter_server>
</system>
Expand Down
14 changes: 9 additions & 5 deletions lib/fluent/counter/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

require 'cool.io'
require 'timeout'
require 'fluent/counter/base_socket'

module Fluent
Expand All @@ -24,8 +25,8 @@ class Client
DEFAULT_HOST = '127.0.0.1'
ID_LIMIT_COUNT = 1 << 31

def initialize(loop = Coolio::Loop.new, opt = {})
@loop = loop
def initialize(loop = nil, opt = {})
@loop = loop || Coolio::Loop.new
@port = opt[:port] || DEFAULT_PORT
@host = opt[:host] || DEFAULT_HOST
@log = opt[:log] || $log
Expand Down Expand Up @@ -207,15 +208,18 @@ def data
def get
# Block until `set` method is called and @result is set a value
join if @result.nil?

@result
end

private

def join
until @set
@mutex.synchronize do
@loop.run_once(0.0001) # retun a lock as soon as possible
Timeout.timeout(5) do
until @set
@mutex.synchronize do
@loop.run_once(0.0001) # retun a lock as soon as possible
end
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/counter/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def initialize(name, opt = {})
@store = Fluent::Counter::Store.new(opt)
@mutex_hash = MutexHash.new(@store)

@server = Coolio::TCPServer.new(@host, @port, Handler, method(:on_message))
@server = Coolio::TCPServer.new(@host, @port.to_i, Handler, method(:on_message))
@thread = nil
@running = false
end
Expand All @@ -50,6 +50,7 @@ def start
@loop.run(0.5)
@running = false
end
@log.debug('Start Counter Server')
@mutex_hash.start
self
end
Expand All @@ -61,6 +62,7 @@ def stop
@loop.stop if @running
@mutex_hash.stop
@thread.join if @thread
@log.debug('Stop Counter Server')
end

def on_message(data)
Expand Down
22 changes: 18 additions & 4 deletions lib/fluent/counter/store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,26 @@ def self.gen_key(scope, key)
"#{scope}\t#{key}"
end

attr_reader :log

def initialize(opt = {})
# Notice: This storage is not be implemented auto save.
@storage = Fluent::Plugin::LocalStorage.new
@storage.configure(
Fluent::Config::Element.new('storage', {}, {'persistent' => true, 'path' => opt[:path] }, [])
)
@log = opt[:log] || $log
@storage = Plugin.new_storage('local', parent: self)
conf = if opt[:path]
{'persistent' => true, 'path' => opt[:path] }
else
{'persistent' => false }
end
@storage.configure(Fluent::Config::Element.new('storage', {}, conf, []))
end

def plugin_id
'hoge'
end

def plugin_id_configured?
false
end

def start
Expand Down
5 changes: 5 additions & 0 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

require 'fluent/plugin/input'
require 'fluent/config/error'
require 'fluent/counter'

module Fluent::Plugin
class DummyInput < Input
Expand Down Expand Up @@ -58,6 +59,9 @@ class DummyInput < Input
def initialize
super
@storage = nil

@c = Fluent::Counter::Client.new(nil, { port: 4321, host: '127.0.0.1' })
@c.establish("hoge")
end

def configure(conf)
Expand Down Expand Up @@ -102,6 +106,7 @@ def emit(num)
if @size > 1
num.times do
router.emit_array(@tag, Array.new(@size) { [Fluent::Engine.now, generate] })
# @c.inc({ name: 'foo', value: 1 })
end
else
num.times { router.emit(@tag, Fluent::Engine.now, generate) }
Expand Down
4 changes: 2 additions & 2 deletions test/counter/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CounterServerTest < ::Test::Unit::TestCase

@scope = "server\tworker\tplugin"
@server_name = 'server1'
@server = Fluent::Counter::Server.new(@server_name, opt: { log: $log })
@server = Fluent::Counter::Server.new(@server_name, opt: { log: $log, path: 'file/path' })
@server.instance_eval { @server.close }
end

Expand All @@ -30,7 +30,7 @@ def extract_value_from_counter(counter, scope, name)

test 'raise an error when server name is invalid' do
assert_raise do
Fluent::Counter::Server.new("\tinvalid_name")
Fluent::Counter::Server.new("\tinvalid_name", opt: { path: 'file/path' })
end
end

Expand Down

0 comments on commit 132ba2d

Please sign in to comment.