Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Processor: Fix for rescheduling execution for max_retry_count is 0 and 1 #1349

Merged
merged 5 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions lua/apisix/utils/batch-processor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
local core = require("apisix.core")
local setmetatable = setmetatable
local timer_at = ngx.timer.at
local fmt = string.format
local ipairs = ipairs
local table = table
local now = ngx.now
Expand Down Expand Up @@ -57,22 +56,20 @@ function execute_func(premature, batch_processor, batch)
return
end

local ok, err = batch_processor.func(batch.entries)
local ok, err = batch_processor.func(batch.entries, batch_processor.batch_max_size)
if not ok then
core.log.error("Batch Processor[", batch_processor.name, "] failed to process entries: ", err)
batch.retry_count = batch.retry_count + 1
if batch.retry_count < batch_processor.max_retry_count then
core.log.warn(fmt("Batch Processor[%s] failed to process entries: ",
batch_processor.name), err)
if batch.retry_count <= batch_processor.max_retry_count then
schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
else
core.log.error(fmt(("Batch Processor[%s] exceeded the max_retry_count[%d] "
.. "dropping the entries"), batch_processor.name, batch.retry_count))
core.log.error("Batch Processor[", batch_processor.name,"] exceeded ",
"the max_retry_count[", batch.retry_count,"] dropping the entries")
end
return
end

core.log.debug(fmt("Batch Processor[%s] successfully processed the entries",
batch_processor.name))
core.log.debug("Batch Processor[", batch_processor.name ,"] successfully processed the entries")
end


Expand All @@ -83,15 +80,15 @@ local function flush_buffer(premature, batch_processor)

if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
batch_processor.name))
core.log.debug("Batch Processor[", batch_processor.name ,"] buffer ",
"duration exceeded, activating buffer flush")
batch_processor:process_buffer()
batch_processor.is_timer_running = false
return
end

-- buffer duration did not exceed or the buffer is active, extending the timer
core.log.debug(fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
core.log.debug("Batch Processor[", batch_processor.name ,"] extending buffer timer")
create_buffer_timer(batch_processor)
end

Expand Down Expand Up @@ -152,7 +149,7 @@ function Batch_Processor:push(entry)
self.last_entry_t = now()

if self.batch_max_size <= #entries then
core.log.debug(fmt("batch processor[%s] batch max size has exceeded", self.name))
core.log.debug("Batch Processor[", self.name ,"] batch max size has exceeded")
self:process_buffer()
end

Expand All @@ -165,8 +162,8 @@ end
function Batch_Processor:process_buffer()
-- If entries are present in the buffer move the entries to processing
if #self.entry_buffer.entries > 0 then
core.log.debug(fmt("tranferring buffer entries to processing pipe line, buffercount[%d]",
#self.entry_buffer.entries))
core.log.debug("tranferring buffer entries to processing pipe line, ",
"buffercount[", #self.entry_buffer.entries ,"]")
self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
self.entry_buffer = { entries = {}, retry_count = 0 }
end
Expand Down
27 changes: 13 additions & 14 deletions t/utils/batch-processor.t
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ GET /t
--- response_body
done
--- error_log
BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] successfully processed the entries
--- wait: 3

Expand All @@ -134,9 +134,9 @@ Batch Processor[log buffer] successfully processed the entries
content_by_lua_block {
local Batch = require("apisix.utils.batch-processor")
local config = {
max_retry_count = 2,
max_retry_count = 2,
batch_max_size = 2,
retry_delay = 0,
retry_delay = 0,
}
local func_to_send = function(elements)
return true
Expand All @@ -157,11 +157,11 @@ GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
--- error_log
batch processor[log buffer] batch max size has exceeded
Batch Processor[log buffer] batch max size has exceeded
Batch Processor[log buffer] successfully processed the entries
--- wait: 0.5
--- wait: 1



Expand Down Expand Up @@ -235,7 +235,7 @@ GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
--- error_log
Batch Processor[log buffer] failed to process entries
Batch Processor[log buffer] exceeded the max_retry_count
Expand Down Expand Up @@ -278,7 +278,7 @@ GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
Batch Processor[log buffer] activating flush due to no activity
--- error_log
batch[1] sent
batch[2] sent
Expand Down Expand Up @@ -315,8 +315,7 @@ GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
Batch Processor[log buffer] failed to process entries
Batch Processor[log buffer] activating flush due to no activity
--- error_log
Batch Processor[log buffer] exceeded the max_retry_count
--- wait: 0.5
Expand Down Expand Up @@ -353,7 +352,7 @@ GET /t
--- response_body
done
--- error_log
BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
Batch Processor[log buffer] successfully processed the entries
--- wait: 3

Expand Down Expand Up @@ -392,7 +391,7 @@ GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
Batch Processor[log buffer] activating flush due to no activity
--- error_log
[{"msg":"1"},{"msg":"2"}]
[{"msg":"3"},{"msg":"4"}]
Expand Down Expand Up @@ -435,7 +434,7 @@ GET /t
--- response_body
done
--- no_error_log
BatchProcessor[log buffer] activating flush due to no activity
Batch Processor[log buffer] activating flush due to no activity
--- error_log
BatchProcessor[log buffer] extending buffer timer
Batch Processor[log buffer] extending buffer timer
--- wait: 3