Skip to content

Commit

Permalink
fix: memory leak caused by timer that never quit (#10614)
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyDluffy6017 authored Dec 11, 2023
1 parent 7dbabf9 commit 0bf719e
Showing 1 changed file with 86 additions and 75 deletions.
161 changes: 86 additions & 75 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -168,119 +168,130 @@ local function do_run_watch(premature)
watch_ctx.rev = rev + 1
watch_ctx.started = true

log.warn("main etcd watcher started, revision=", watch_ctx.rev)
for _, sema in pairs(watch_ctx.wait_init) do
sema:post()
log.info("main etcd watcher started, revision=", watch_ctx.rev)

if watch_ctx.wait_init then
for _, sema in pairs(watch_ctx.wait_init) do
sema:post()
end
watch_ctx.wait_init = nil
end
watch_ctx.wait_init = nil

local opts = {}
opts.timeout = 50 -- second
opts.need_cancel = true
opts.start_revision = watch_ctx.rev

log.info("restart watchdir: start_revision=", opts.start_revision)

local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
if not res_func then
log.error("watchdir err: ", err)
ngx_sleep(3)
return
end

::restart_watch::
::watch_event::
while true do
opts.start_revision = watch_ctx.rev
log.info("restart watchdir: start_revision=", opts.start_revision)
local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
if not res_func then
log.error("watchdir: ", err)
ngx_sleep(3)
goto restart_watch
local res, err = res_func()
if log_level >= NGX_INFO then
log.info("res_func: ", inspect(res))
end

::watch_event::
while true do
local res, err = res_func()
if log_level >= NGX_INFO then
log.info("res_func: ", inspect(res))
if not res then
if err ~= "closed" and
err ~= "timeout" and
err ~= "broken pipe"
then
log.error("wait watch event: ", err)
end
cancel_watch(http_cli)
break
end

if not res then
if err ~= "closed" and
err ~= "timeout" and
err ~= "broken pipe"
then
log.error("wait watch event: ", err)
end
cancel_watch(http_cli)
break
end
if res.error then
log.error("wait watch event: ", inspect(res.error))
cancel_watch(http_cli)
break
end

if res.error then
log.error("wait watch event: ", inspect(res.error))
cancel_watch(http_cli)
break
end
if res.result.created then
goto watch_event
end

if res.result.created then
goto watch_event
if res.result.canceled then
log.warn("watch canceled by etcd, res: ", inspect(res))
if res.result.compact_revision then
watch_ctx.rev = tonumber(res.result.compact_revision)
log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
produce_res(nil, "compacted")
end
cancel_watch(http_cli)
break
end

if res.result.canceled then
log.warn("watch canceled by etcd, res: ", inspect(res))
if res.result.compact_revision then
watch_ctx.rev = tonumber(res.result.compact_revision)
log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
produce_res(nil, "compacted")
end
cancel_watch(http_cli)
break
-- cleanup
local min_idx = 0
for _, idx in pairs(watch_ctx.idx) do
if (min_idx == 0) or (idx < min_idx) then
min_idx = idx
end
end

-- cleanup
local min_idx = 0
for _, idx in pairs(watch_ctx.idx) do
if (min_idx == 0) or (idx < min_idx) then
min_idx = idx
end
end
for i = 1, min_idx - 1 do
watch_ctx.res[i] = false
end

for i = 1, min_idx - 1 do
watch_ctx.res[i] = false
if min_idx > 100 then
for k, idx in pairs(watch_ctx.idx) do
watch_ctx.idx[k] = idx - min_idx + 1
end

if min_idx > 100 then
for k, idx in pairs(watch_ctx.idx) do
watch_ctx.idx[k] = idx - min_idx + 1
end
-- trim the res table
for i = 1, min_idx - 1 do
table.remove(watch_ctx.res, 1)
end
-- trim the res table
for i = 1, min_idx - 1 do
table.remove(watch_ctx.res, 1)
end
end

local rev = tonumber(res.result.header.revision)
if rev > watch_ctx.rev then
watch_ctx.rev = rev + 1
end
produce_res(res)
local rev = tonumber(res.result.header.revision)
if rev > watch_ctx.rev then
watch_ctx.rev = rev + 1
end
produce_res(res)
end
end


local function run_watch(premature)
local run_watch_th = ngx_thread_spawn(do_run_watch, premature)
local run_watch_th, err = ngx_thread_spawn(do_run_watch, premature)
if not run_watch_th then
log.error("failed to spawn thread do_run_watch: ", err)
return
end

::restart::
local check_worker_th = ngx_thread_spawn(function ()
local check_worker_th, err = ngx_thread_spawn(function ()
while not exiting() do
ngx_sleep(0.1)
end
end)
if not check_worker_th then
log.error("failed to spawn thread check_worker: ", err)
return
end

local ok, err = ngx_thread_wait(check_worker_th)

local ok, err = ngx_thread_wait(run_watch_th, check_worker_th)
if not ok then
log.error("check_worker thread terminates failed, retart checker, error: " .. err)
ngx_thread_kill(check_worker_th)
goto restart
end

ngx_thread_kill(run_watch_th)
-- notify child watchers
produce_res(nil, "worker exited")
ngx_thread_kill(check_worker_th)

if not exiting() then
ngx_timer_at(0, run_watch)
else
-- notify child watchers
produce_res(nil, "worker exited")
end
end


Expand Down

0 comments on commit 0bf719e

Please sign in to comment.