Skip to content

Commit

Permalink
tests, annotations and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladislav Grubov committed Jul 20, 2024
1 parent 128f396 commit 2de0410
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 46 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ jobs:
run-luacheck-linter:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: tarantool/setup-tarantool@v2
- uses: actions/checkout@v4
- uses: tarantool/setup-tarantool@v3
with:
tarantool-version: '2.10.4'

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ jobs:
run-unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: tarantool/setup-tarantool@v2
- uses: actions/checkout@v4
- uses: tarantool/setup-tarantool@v3
with:
tarantool-version: '2.10.4'
- name: install luacov-console
Expand Down
5 changes: 2 additions & 3 deletions .luacheckrc
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
std = "luajit"
std = "tarantool"
codes = true
globals = {
-- Tarantool variable:
"box",
-- Tarantool variables:
"table.deepcopy",
"dostring",

Expand Down
2 changes: 1 addition & 1 deletion .luacov
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ exclude = {
"example",
}

runreport = false
runreport = true
deletestats = false

coveralls = {
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
### TODO

* reload/upgrade and feature switch
* composite primary keys
* switch - turn non-taken task into any state
Expand Down Expand Up @@ -35,7 +36,7 @@

# Interface

## Creator methods:
## Creator methods

* `.upgrade(space, options)`

Expand Down Expand Up @@ -83,7 +84,7 @@ M.upgrade(space, {
})
```

## Producer methods:
## Producer methods

* `space:put`
- `task` - table or array or tuple
Expand Down Expand Up @@ -146,7 +147,7 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
+ `update` - table for update, like in space:update
+ `ttl` - timeout for time to live (?)

## Admin methods:
## Admin methods

* `space:dig(id, [attr])` - dig out task from buried state
- `id` - as in `:ack`
Expand Down
4 changes: 2 additions & 2 deletions example-internal-queue/queue.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---@class Queue
---@field q xqueueSpace space with Queue
---@field q xqueue.space space with Queue
---@field opts QueueOptions options of the queue
local M = require 'obj'.class({}, 'queue')

Expand Down Expand Up @@ -131,7 +131,7 @@ function M:_init(cfg)
---@type table<string, {job: any, task: Task, taken_at: number}>
self._task_map = {}

---@cast space xqueueSpace
---@cast space xqueue.space
self.q = space

self._on = {}
Expand Down
35 changes: 22 additions & 13 deletions test/basic_test.lua
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
local fio = require 'fio'
---@diagnostic disable: inject-field
local fiber = require 'fiber'
local xqueue = require 'xqueue'

require 'test.setup'

local t = require 'luatest' --[[@as luatest]]
local g = t.group('basic')

t.before_suite(function()
local tmpdir = assert(fio.tempdir())
box.cfg{ memtx_dir = tmpdir, wal_dir = tmpdir, vinyl_dir = tmpdir }
end)

g.before_each(function()
if box.space.queue then
box.space.queue:truncate()
for i = #box.space.queue.index, 0, -1 do
local ind = box.space.queue.index[i]
ind:drop()
Expand All @@ -22,6 +20,10 @@ end)

function g.test_basic_queue()
box.schema.space.create('queue', { if_not_exists = true })
---@class test.xqueue.basic.tuple: box.tuple
---@field id string
---@field status string
---@field payload any
box.space.queue:format({
{ name = 'id', type = 'string' },
{ name = 'status', type = 'string' },
Expand All @@ -41,7 +43,7 @@ function g.test_basic_queue()
},
})

local task = box.space.queue:put({ payload = { task = 1 } })
local task = box.space.queue:put({ payload = { task = 1 } }) --[[@as test.xqueue.basic.tuple]]
t.assert_equals(task.status, 'R', "newly inserted task must be Ready")
t.assert_items_include(task.payload, { task = 1 }, "payload of the task remains the same")

Expand All @@ -53,7 +55,7 @@ function g.test_basic_queue()
t.assert_le(elapsed, 0.1, "queue:take() must return task after single yield")
t.assert_equals(taken.status, 'T', "taken task must be in status T")

local tuple = box.space.queue:get(taken.id)
local tuple = box.space.queue:get(taken.id) --[[@as test.xqueue.basic.tuple]]
t.assert_equals(tuple.status, 'T', "status of taken task in space must be T")

ft = fiber.time()
Expand All @@ -64,7 +66,7 @@ function g.test_basic_queue()
t.assert_le(elapsed, 0.1, "second queue:take() must be returned in ≤ 0.1 seconds")

box.space.queue:release(taken)
tuple = box.space.queue:get(taken.id)
tuple = box.space.queue:get(taken.id) --[[@as test.xqueue.basic.tuple]]

t.assert_equals(tuple.status, 'R', "queue:release() must return task in R status")
taken = box.space.queue:take(0.001)
Expand Down Expand Up @@ -123,7 +125,12 @@ function g.test_basic_queue()
end

function g.test_delayed_queue()
local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueueSpace]]
local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]]
---@class test.xqueue.delayed.tuple: box.tuple
---@field id number
---@field status string
---@field runat number
---@field payload any
queue:format({
{ name = 'id', type = 'unsigned' },
{ name = 'status', type = 'string' },
Expand All @@ -150,17 +157,19 @@ function g.test_delayed_queue()

local task_put_delay_500ms = queue:put({ payload = { id = 2 } }, {
delay = 0.1,
})
}) --[[@as test.xqueue.delayed.tuple]]
t.assert_equals(task_put_delay_500ms.status, 'W', 'queue:put(...,{delay=<>}) must insert task in W status')
t.assert_equals(queue:get({task_put_delay_500ms.id}).status, 'W', 'double check task status in space (must be W)')

local task_put = queue:put({ payload = { id = 1 } })
local task_put = queue:put({ payload = { id = 1 } }) --[[@as test.xqueue.delayed.tuple]]
t.assert_equals(task_put.status, 'R', "queue:put() without delay in delayed queue must set task to R")
t.assert_equals(queue:get({task_put.id}).status, 'R', 'double check task status in space (must be R)')

t.assert_lt(task_put_delay_500ms.id, task_put.id, "first task must have smaller id than second (fifo constraint)")

local taken = queue:take({ timeout = 0.05 })
t.assert(taken, "task must be taken")
---@cast taken -nil
t.assert_equals(taken.id, task_put.id, 'queue:take() must return ready task')

queue:release(taken, { delay = 0.12 })
Expand Down Expand Up @@ -209,7 +218,7 @@ function g.test_delayed_queue()
t.assert_is(queue:get(take.id).status, 'Z', ':ack()+delay return task into Z status')
t.assert_items_equals(queue:get(take.id).payload, { finished = 2 }, ':ack()/update must replace tuple content')

local nothing = queue:take({ timeout = 0.75 })
local nothing = queue:take({ timeout = 1.5 })
t.assert_not(nothing, "Z tasks must are not takable")

t.assert_not(queue:get(take.id), "Z task must be collected after delay time out exhausted")
Expand Down
7 changes: 7 additions & 0 deletions test/setup.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
local fio = require 'fio'
local t = require 'luatest' --[[@as luatest]]

t.before_suite(function()
local tmpdir = assert(fio.tempdir())
box.cfg{ memtx_dir = tmpdir, wal_dir = tmpdir, vinyl_dir = tmpdir }
end)
126 changes: 126 additions & 0 deletions test/tube_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
---@diagnostic disable: inject-field
local fiber = require 'fiber'
local xqueue = require 'xqueue'
require 'test.setup'

local t = require 'luatest' --[[@as luatest]]
local g = t.group('tube')

g.before_each(function()
if box.space.queue then
box.space.queue:truncate()
for i = #box.space.queue.index, 0, -1 do
local ind = box.space.queue.index[i]
ind:drop()
end
box.space.queue:drop()
end
end)

function g.test_tube_queue()
local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]]
queue:format({
{ name = 'id', type = 'unsigned' },
{ name = 'tube', type = 'string' },
{ name = 'status', type = 'string' },
{ name = 'runat', type = 'number' },
{ name = 'payload', type = 'any' },
})

queue:create_index('primary', { parts = {'id'} })
queue:create_index('tube_status', { parts = {'tube', 'status', 'id'} })
queue:create_index('status', { parts = {'status', 'id'} })
queue:create_index('runat', { parts = {'runat', 'id'} })

xqueue.upgrade(queue, {
features = {
id = 'time64',
delayed = true,
},
fields = {
status = 'status',
runat = 'runat',
tube = 'tube',
},
})

local trans = setmetatable({ __cache = {}, },{__index=function(self, tx)
local s,e = unpack(tostring(tx):split("_"))
local n = s:upper().."-"..e:upper()
self[n] = rawget(self, n) or 0
local func = self.__cache[n] or function(inc) self[n] = self[n] + inc end
self.__cache[n] = func
return func
end})

for i = 1, 10 do
local tube_name = 'tube-'..i
local task = queue:put({ tube = tube_name, payload = { id = i } })
t.assert_equals(task.payload.id, i, "task:put("..i..")")
t.assert_items_include(task.payload, { id = i }, "payload of the task remains the same")
end
trans.x_r(10)

for i = 10, 1, -1 do
local tube_name = 'tube-'..i
local task = queue:take({ tube = tube_name })
trans.r_t(1)
t.assert_equals(task.status, 'T', "task has been taken from "..tube_name)
t.assert_equals(task.tube, tube_name, "task was returned from the tube we requested")

local notask = queue:take({
tube = tube_name,
timeout = 0.005,
})

t.assert_equals(notask, nil, "no task should be returned from the empty tube")

if i % 2 == 0 then
local ret = queue:ack(task)
trans.t_x(1)
t.assert_is_not(ret, nil, ":ack() returns task back")
t.assert_equals(task.id, ret.id, ":ack() returned same task")
else
local ret = queue:release(task)
trans.t_r(1)
t.assert_is_not(ret, nil, ":release() returns task back")
t.assert_equals(task.id, ret.id, ":release() returned same task")
t.assert_equals(ret.status, "R", ":release() returns task into Ready status")
end
end

-- now test generic worker
for i = 1, queue:len() do
local task = queue:take(0) -- no-yield take
t.assert_is_not(task, nil, ":take() returned task")
---@cast task -nil

trans.r_t(1)

-- FIFO order of the tubes.
local expected_tube_name = 'tube-'..(2*i-1)
t.assert_equals(task.tube, expected_tube_name, ":take() must follow FIFO and return task from correct tube")

local ret = queue:release(task, { delay = 1 })
trans.t_w(1)
t.assert_equals(ret.status, 'W', ":release(+delay) returns task into W state")
t.assert_equals(ret.id, task.id, ":release() returned the same task we sent to release")
end

local stats = queue:stats()

trans.__cache = nil
setmetatable(trans, nil)
t.assert_covers(stats, {
counts = {
B = 0,
D = 0,
R = 0,
T = 0,
W = 5,
Z = 0,
},
transition = trans,
tube = {},
}, "queue:stats() with tubes")
end
Loading

0 comments on commit 2de0410

Please sign in to comment.