-
Notifications
You must be signed in to change notification settings - Fork 170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add TimerTask #782
Add TimerTask #782
Changes from all commits
3173553
36c5580
6559a3b
4f20f44
d755a54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
local uuid = require 'resty.jit-uuid' | ||
local setmetatable = setmetatable | ||
local getmetatable = getmetatable | ||
local newproxy = newproxy | ||
local unpack = unpack | ||
|
||
local _M = {} | ||
|
||
local default_interval_seconds = 60 | ||
|
||
_M.active_tasks = {} | ||
|
||
function _M.register_task(id) | ||
_M.active_tasks[id] = true | ||
end | ||
|
||
function _M.unregister_task(id) | ||
_M.active_tasks[id] = nil | ||
end | ||
|
||
function _M.task_is_active(id) | ||
return _M.active_tasks[id] or false | ||
end | ||
|
||
local function generate_id() | ||
return uuid.generate_v4() | ||
end | ||
|
||
local function mt(id) | ||
-- Using 'newproxy' is needed to be able to define __gc(). | ||
-- With __gc we can make sure that the task will stop scheduling more work | ||
-- after it has been garbage collected. | ||
local proxy = newproxy(true) | ||
local res_mt = getmetatable(proxy) | ||
res_mt.__gc = function() _M.unregister_task(id) end | ||
res_mt.__index = _M | ||
return res_mt | ||
end | ||
|
||
--- Initialize a TimerTask. | ||
-- @tparam function task The function to be run periodically | ||
-- @tparam[opt] table opts | ||
-- @tfield ?table args Arguments to the function | ||
-- @tfield ?number interval Interval in seconds (defaults to 60) | ||
function _M.new(task, opts) | ||
local options = opts or {} | ||
|
||
local id = generate_id() | ||
|
||
local self = setmetatable({}, mt(id)) | ||
self.task = task | ||
self.args = options.args | ||
self.interval = options.interval or default_interval_seconds | ||
self.id = id | ||
|
||
_M.register_task(id) | ||
|
||
return self | ||
end | ||
|
||
local run_periodic, schedule_next, timer_execute | ||
|
||
run_periodic = function(self) | ||
if not _M.task_is_active(self.id) then return end | ||
|
||
self.task(unpack(self.args)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to use 4a5a815 to make this safe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I can do that after resty.concurrent is merged. |
||
|
||
schedule_next(self) | ||
end | ||
|
||
-- Note: ngx.timer.at always sends "premature" as the first param. | ||
-- "premature" is boolean value indicating whether it is a premature timer | ||
-- expiration. | ||
timer_execute = function(_, self) | ||
run_periodic(self) | ||
end | ||
|
||
schedule_next = function(self) | ||
local ok, err = ngx.timer.at(self.interval, timer_execute, self) | ||
|
||
if not ok then | ||
ngx.log(ngx.ERR, "failed to schedule timer task: ", err) | ||
end | ||
end | ||
|
||
function _M:execute() | ||
run_periodic(self) | ||
end | ||
|
||
function _M:cancel() | ||
_M.unregister_task(self.id) | ||
end | ||
|
||
return _M |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
local TimerTask = require('resty.concurrent.timer_task') | ||
local ngx_timer = ngx.timer | ||
|
||
describe('TimerTask', function() | ||
local test_task = function() end | ||
|
||
before_each(function() | ||
TimerTask.active_tasks = {} | ||
end) | ||
|
||
describe('.register_task', function() | ||
it('adds an ID to the list of active tasks', function() | ||
local id = '1' | ||
|
||
TimerTask.register_task(id) | ||
|
||
assert.is_true(TimerTask.task_is_active(id)) | ||
end) | ||
end) | ||
|
||
describe('.unregister_task', function() | ||
local id = '1' | ||
|
||
setup(function() | ||
TimerTask.register_task(id) | ||
end) | ||
|
||
it('removes an ID to the list of active tasks', function() | ||
TimerTask.unregister_task(id) | ||
assert.is_false(TimerTask.task_is_active(id)) | ||
end) | ||
end) | ||
|
||
describe(':new', function() | ||
it('adds the task to the list of active tasks', function() | ||
local task = TimerTask.new(test_task) | ||
|
||
assert.is_true(TimerTask.task_is_active(task.id)) | ||
end) | ||
|
||
it('sets a default interval of 60s when not specified', function() | ||
local task = TimerTask.new(test_task) | ||
|
||
assert.equals(60, task.interval) | ||
end) | ||
|
||
it('allows to set a running interval', function() | ||
local interval = 10 | ||
|
||
local task = TimerTask.new(test_task, { interval = interval }) | ||
|
||
assert.equals(interval, task.interval) | ||
end) | ||
|
||
it('allows to set arguments for the task', function() | ||
local args = { '1', '2' } | ||
|
||
local task = TimerTask.new(test_task, { args = args }) | ||
|
||
assert.same(args, task.args) | ||
end) | ||
end) | ||
|
||
describe(':cancel', function() | ||
it('removes the task from the list of active tasks', function() | ||
local task = TimerTask.new(test_task) | ||
|
||
task:cancel() | ||
|
||
assert.is_false(TimerTask.task_is_active(task.id)) | ||
end) | ||
end) | ||
|
||
describe(':execute', function() | ||
local func = test_task | ||
local ngx_timer_stub | ||
|
||
local args = { '1', '2', '3' } | ||
local interval = 10 | ||
|
||
before_each(function() | ||
ngx_timer_stub = stub(ngx_timer, 'at') | ||
end) | ||
|
||
describe('when the task is active', function() | ||
it('runs the task', function() | ||
local timer_task = TimerTask.new(func, { args = args, interval = interval }) | ||
local func_stub = stub(timer_task, 'task') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shadowing upvalue 'func_stub' on line 76 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Codeclimate does not update comments after initial review? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like it writes another comment without deleting the old one. |
||
|
||
timer_task:execute() | ||
|
||
assert.stub(func_stub).was_called_with(unpack(args)) | ||
end) | ||
|
||
it('schedules the next one', function() | ||
local timer_task = TimerTask.new(func, { args = args, interval = interval }) | ||
|
||
timer_task:execute() | ||
|
||
assert.stub(ngx_timer_stub).was_called() | ||
end) | ||
end) | ||
|
||
describe('when the task is not active', function() | ||
it('does not run the task', function() | ||
local timer_task = TimerTask.new(func, { args = args, interval = interval }) | ||
local func_stub = stub(timer_task, 'task') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shadowing upvalue 'func_stub' on line 76 |
||
timer_task:cancel() | ||
|
||
timer_task:execute() | ||
|
||
assert.stub(func_stub).was_not_called() | ||
end) | ||
|
||
it('does not schedule another task', function() | ||
local timer_task = TimerTask.new(func, { args = args, interval = interval }) | ||
timer_task:cancel() | ||
|
||
timer_task:execute() | ||
|
||
assert.stub(ngx_timer_stub).was_not_called() | ||
end) | ||
end) | ||
end) | ||
|
||
it('cancels itself when it is garbage collected', function() | ||
local timer_task = TimerTask.new(test_task) | ||
local id = timer_task.id | ||
|
||
timer_task = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. value assigned to variable 'timer_task' is unused |
||
collectgarbage() | ||
|
||
assert.is_false(TimerTask.task_is_active(id)) | ||
end) | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is very weird style. Why not just define each function as local?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. They all depend on each other so they need to be defined this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly.