Skip to content

Commit

Permalink
fix: create the health checker in access phase (apache#3240)
Browse files Browse the repository at this point in the history
Allow to call yield function in the healthcheck.

Signed-off-by: spacewander <[email protected]>
Co-authored-by: YuanSheng Wang <[email protected]>

Fix apache#1851
Fix apache#2842
  • Loading branch information
spacewander authored and sysulq committed Jan 15, 2021
1 parent eeb3bea commit 079dabb
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 100 deletions.
91 changes: 2 additions & 89 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local healthcheck
local require = require
local discovery = require("apisix.discovery.init").discovery
local balancer = require("ngx.balancer")
local core = require("apisix.core")
local ipairs = ipairs
local tostring = tostring
local set_more_tries = balancer.set_more_tries
local get_last_failure = balancer.get_last_failure
local set_timeouts = balancer.set_timeouts
Expand All @@ -36,9 +33,6 @@ local pickers = {
local lrucache_server_picker = core.lrucache.new({
ttl = 300, count = 256
})
local lrucache_checker = core.lrucache.new({
ttl = 300, count = 256
})
local lrucache_addr = core.lrucache.new({
ttl = 300, count = 1024 * 4
})
Expand Down Expand Up @@ -83,67 +77,6 @@ local function fetch_health_nodes(upstream, checker)
end


local function create_checker(upstream, healthcheck_parent)
if healthcheck == nil then
healthcheck = require("resty.healthcheck")
end
local checker, err = healthcheck.new({
name = "upstream#" .. healthcheck_parent.key,
shm_name = "upstream-healthcheck",
checks = upstream.checks,
})

if not checker then
core.log.error("fail to create healthcheck instance: ", err)
return
end

local host = upstream.checks and upstream.checks.active and upstream.checks.active.host
local port = upstream.checks and upstream.checks.active and upstream.checks.active.port
for _, node in ipairs(upstream.nodes) do
local ok, err = checker:add_target(node.host, port or node.port, host)
if not ok then
core.log.error("failed to add new health check target: ", node.host, ":",
port or node.port, " err: ", err)
end
end

if upstream.parent then
core.table.insert(upstream.parent.clean_handlers, function ()
core.log.info("try to release checker: ", tostring(checker))
checker:clear()
checker:stop()
end)

else
core.table.insert(healthcheck_parent.clean_handlers, function ()
core.log.info("try to release checker: ", tostring(checker))
checker:clear()
checker:stop()
end)
end

core.log.info("create new checker: ", tostring(checker))
return checker
end


local function fetch_healthchecker(upstream, healthcheck_parent, version)
if not upstream.checks then
return
end

if upstream.checker then
return
end

local checker = lrucache_checker(upstream, version,
create_checker, upstream,
healthcheck_parent)
return checker
end


local function create_server_picker(upstream, checker)
local picker = pickers[upstream.type]
if picker then
Expand All @@ -167,25 +100,6 @@ local function pick_server(route, ctx)
core.log.info("route: ", core.json.delay_encode(route, true))
core.log.info("ctx: ", core.json.delay_encode(ctx, true))
local up_conf = ctx.upstream_conf
if up_conf.service_name then
if not discovery then
return nil, "discovery is uninitialized"
end
if not up_conf.discovery_type then
return nil, "discovery server need appoint"
end

local dis = discovery[up_conf.discovery_type]
if not dis then
return nil, "discovery is uninitialized"
end
up_conf.nodes = dis.nodes(up_conf.service_name)
end

local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 0 then
return nil, "no valid upstream node"
end

if ctx.server_picker and ctx.server_picker.after_balance then
ctx.server_picker.after_balance(ctx, true)
Expand All @@ -200,18 +114,17 @@ local function pick_server(route, ctx)
end
end

local nodes_count = #up_conf.nodes
if nodes_count == 1 then
local node = up_conf.nodes[1]
ctx.balancer_ip = node.host
ctx.balancer_port = node.port
return node
end

local healthcheck_parent = ctx.upstream_healthcheck_parent
local version = ctx.upstream_version
local key = ctx.upstream_key
local checker = fetch_healthchecker(up_conf, healthcheck_parent, version)
ctx.up_checker = checker
local checker = ctx.up_checker

ctx.balancer_try_count = (ctx.balancer_try_count or 0) + 1
if checker and ctx.balancer_try_count > 1 then
Expand Down
12 changes: 6 additions & 6 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -536,10 +536,10 @@ function _M.http_access_phase()
run_plugin("access", plugins, api_ctx)
end

local ok, err = set_upstream(route, api_ctx)
if not ok then
core.log.error("failed to parse upstream: ", err)
core.response.exit(500)
local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end

set_upstream_host(api_ctx)
Expand Down Expand Up @@ -893,8 +893,8 @@ function _M.stream_preread_phase()

run_plugin("preread", plugins, api_ctx)

local ok, err = set_upstream(matched_route, api_ctx)
if not ok then
local code, err = set_upstream(matched_route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
return ngx_exit(1)
end
Expand Down
103 changes: 99 additions & 4 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local require = require
local core = require("apisix.core")
local discovery = require("apisix.discovery.init").discovery
local error = error
local tostring = tostring
local ipairs = ipairs
local pairs = pairs
local upstreams
local healthcheck


local lrucache_checker = core.lrucache.new({
ttl = 300, count = 256
})


local _M = {}
Expand Down Expand Up @@ -52,22 +59,110 @@ end
_M.set = set_directly


local function create_checker(upstream, healthcheck_parent)
if healthcheck == nil then
healthcheck = require("resty.healthcheck")
end

local checker, err = healthcheck.new({
name = "upstream#" .. healthcheck_parent.key,
shm_name = "upstream-healthcheck",
checks = upstream.checks,
})

if not checker then
core.log.error("fail to create healthcheck instance: ", err)
return nil
end

local host = upstream.checks and upstream.checks.active and upstream.checks.active.host
local port = upstream.checks and upstream.checks.active and upstream.checks.active.port
for _, node in ipairs(upstream.nodes) do
local ok, err = checker:add_target(node.host, port or node.port, host)
if not ok then
core.log.error("failed to add new health check target: ", node.host, ":",
port or node.port, " err: ", err)
end
end

if upstream.parent then
core.table.insert(upstream.parent.clean_handlers, function ()
core.log.info("try to release checker: ", tostring(checker))
checker:clear()
checker:stop()
end)

else
core.table.insert(healthcheck_parent.clean_handlers, function ()
core.log.info("try to release checker: ", tostring(checker))
checker:clear()
checker:stop()
end)
end

core.log.info("create new checker: ", tostring(checker))
return checker
end


local function fetch_healthchecker(upstream, healthcheck_parent, version)
if not upstream.checks then
return
end

if upstream.checker then
return
end

local checker = lrucache_checker(upstream, version,
create_checker, upstream,
healthcheck_parent)
return checker
end


function _M.set_by_route(route, api_ctx)
if api_ctx.upstream_conf then
core.log.warn("upstream node has been specified, ",
"cannot be set repeatedly")
return true
return
end

local up_conf = api_ctx.matched_upstream
if not up_conf then
return false, "missing upstream configuration in Route or Service"
return 500, "missing upstream configuration in Route or Service"
end
-- core.log.info("up_conf: ", core.json.delay_encode(up_conf, true))

if up_conf.service_name then
if not discovery then
return 500, "discovery is uninitialized"
end
if not up_conf.discovery_type then
return 500, "discovery server need appoint"
end

local dis = discovery[up_conf.discovery_type]
if not dis then
return 500, "discovery " .. up_conf.discovery_type .. "is uninitialized"
end
up_conf.nodes = dis.nodes(up_conf.service_name)
end

set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
api_ctx.conf_version, up_conf, route)
return true

local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 0 then
return 502, "no valid upstream node"
end

if nodes_count > 1 then
local checker = fetch_healthchecker(up_conf, route, api_ctx.upstream_version)
api_ctx.up_checker = checker
end

return
end


Expand Down
2 changes: 1 addition & 1 deletion t/discovery/eureka.t
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ routes:
GET /eureka/apps/APISIX-EUREKA
--- error_code: 502
--- error_log eval
qr/.*failed to pick server: no valid upstream node.*/
qr/.* no valid upstream node.*/
Expand Down

0 comments on commit 079dabb

Please sign in to comment.