Skip to content

Commit

Permalink
refactor: the parent of upstream should point to its original src (#3287
Browse files Browse the repository at this point in the history
)

Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Jan 15, 2021
1 parent 5bdfb8f commit bbbdf58
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 54 deletions.
45 changes: 32 additions & 13 deletions apisix/core/table.lua
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,44 @@ function _M.setmt__gc(t, mt)
end


local function deepcopy(orig)
local orig_type = type(orig)
if orig_type ~= 'table' then
return orig
end
local deepcopy
do
local function _deepcopy(orig, copied)
-- prevent infinite loop when a field refers its parent
copied[orig] = true
-- If the array-like table contains nil in the middle,
-- the len might be smaller than the expected.
-- But it doesn't affect the correctness.
local len = #orig
local copy = new_tab(len, nkeys(orig) - len)
for orig_key, orig_value in pairs(orig) do
if type(orig_value) == "table" and not copied[orig_value] then
copy[orig_key] = _deepcopy(orig_value, copied)
else
copy[orig_key] = orig_value
end
end

-- If the array-like table contains nil in the middle,
-- the len might be smaller than the expected.
-- But it doesn't affect the correctness.
local len = #orig
local copy = new_tab(len, nkeys(orig) - len)
for orig_key, orig_value in pairs(orig) do
copy[orig_key] = deepcopy(orig_value)
return copy
end

return copy

local copied_recorder = {}

function deepcopy(orig)
local orig_type = type(orig)
if orig_type ~= 'table' then
return orig
end

local res = _deepcopy(orig, copied_recorder)
_M.clear(copied_recorder)
return res
end
end
_M.deepcopy = deepcopy


local ngx_null = ngx.null
local function merge(origin, extend)
for k,v in pairs(extend) do
Expand Down
1 change: 1 addition & 0 deletions apisix/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ local function filter(service)
service.value.upstream.nodes = new_nodes
end

service.value.upstream.parent = service
core.log.info("filter service: ", core.json.delay_encode(service))
end

Expand Down
7 changes: 2 additions & 5 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,8 @@ local function merge_service_route(service_conf, route_conf)
local route_upstream = route_conf.value.upstream
if route_upstream then
new_conf.value.upstream = route_upstream

if route_upstream.checks then
route_upstream.parent = route_conf
end

-- when route's upstream override service's upstream,
-- the upstream.parent still point to the route
new_conf.value.upstream_id = nil
new_conf.has_domain = route_conf.has_domain
end
Expand Down
2 changes: 1 addition & 1 deletion apisix/plugins/example-plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ function _M.access(conf, ctx)

local matched_route = ctx.matched_route
upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
ctx.conf_version, up_conf, matched_route)
ctx.conf_version, up_conf)
return
end

Expand Down
4 changes: 3 additions & 1 deletion apisix/plugins/traffic-split.lua
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,16 @@ local function set_upstream(upstream_info, ctx)

local ok, err = upstream.check_schema(up_conf)
if not ok then
core.log.error("failed to validate generated upstream: ", err)
return 500, err
end

local matched_route = ctx.matched_route
up_conf.parent = matched_route
local upstream_key = up_conf.type .. "#route_" ..
matched_route.value.id .. "_" ..upstream_info.vid
core.log.info("upstream_key: ", upstream_key)
upstream.set(ctx, upstream_key, ctx.conf_version, up_conf, matched_route)
upstream.set(ctx, upstream_key, ctx.conf_version, up_conf)

return
end
Expand Down
1 change: 1 addition & 0 deletions apisix/router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ local function filter(route)
route.value.upstream.nodes = new_nodes
end

route.value.upstream.parent = route
core.log.info("filter route: ", core.json.delay_encode(route))
end

Expand Down
2 changes: 1 addition & 1 deletion apisix/stream/plugins/mqtt-proxy.lua
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ function _M.preread(conf, ctx)

local matched_route = ctx.matched_route
upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
ctx.conf_version, up_conf, matched_route)
ctx.conf_version, up_conf)
return
end

Expand Down
39 changes: 14 additions & 25 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ local lrucache_checker = core.lrucache.new({
local _M = {}


local function set_directly(ctx, key, ver, conf, parent)
local function set_directly(ctx, key, ver, conf)
if not ctx then
error("missing argument ctx", 2)
end
Expand All @@ -46,24 +46,22 @@ local function set_directly(ctx, key, ver, conf, parent)
if not conf then
error("missing argument conf", 2)
end
if not parent then
error("missing argument parent", 2)
end

ctx.upstream_conf = conf
ctx.upstream_version = ver
ctx.upstream_key = key
ctx.upstream_healthcheck_parent = parent
ctx.upstream_healthcheck_parent = conf.parent
return
end
_M.set = set_directly


local function create_checker(upstream, healthcheck_parent)
local function create_checker(upstream)
if healthcheck == nil then
healthcheck = require("resty.healthcheck")
end

local healthcheck_parent = upstream.parent
local checker, err = healthcheck.new({
name = "upstream#" .. healthcheck_parent.key,
shm_name = "upstream-healthcheck",
Expand All @@ -85,27 +83,18 @@ local function create_checker(upstream, healthcheck_parent)
end
end

if upstream.parent then
core.table.insert(upstream.parent.clean_handlers, function ()
core.log.info("try to release checker: ", tostring(checker))
checker:clear()
checker:stop()
end)

else
core.table.insert(healthcheck_parent.clean_handlers, function ()
core.log.info("try to release checker: ", tostring(checker))
checker:clear()
checker:stop()
end)
end
core.table.insert(healthcheck_parent.clean_handlers, function ()
core.log.info("try to release checker: ", tostring(checker))
checker:clear()
checker:stop()
end)

core.log.info("create new checker: ", tostring(checker))
return checker
end


local function fetch_healthchecker(upstream, healthcheck_parent, version)
local function fetch_healthchecker(upstream, version)
if not upstream.checks then
return
end
Expand All @@ -115,8 +104,7 @@ local function fetch_healthchecker(upstream, healthcheck_parent, version)
end

local checker = lrucache_checker(upstream, version,
create_checker, upstream,
healthcheck_parent)
create_checker, upstream)
return checker
end

Expand Down Expand Up @@ -150,15 +138,15 @@ function _M.set_by_route(route, api_ctx)
end

set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
api_ctx.conf_version, up_conf, route)
api_ctx.conf_version, up_conf)

local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 0 then
return 502, "no valid upstream node"
end

if nodes_count > 1 then
local checker = fetch_healthchecker(up_conf, route, api_ctx.upstream_version)
local checker = fetch_healthchecker(up_conf, api_ctx.upstream_version)
api_ctx.up_checker = checker
end

Expand Down Expand Up @@ -219,6 +207,7 @@ function _M.init_worker()
upstream.value.nodes = new_nodes
end

upstream.value.parent = upstream
core.log.info("filter upstream: ", core.json.delay_encode(upstream))
end,
})
Expand Down
120 changes: 117 additions & 3 deletions t/node/healthcheck.t
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ GET /t
--- response_body
[{"count":12,"port":"1980"}]
--- grep_error_log eval
qr/unhealthy .* for '.*'/
qr/\([^)]+\) unhealthy .* for '.*'/
--- grep_error_log_out
unhealthy TCP increment (1/2) for 'foo.com(127.0.0.1:1970)'
unhealthy TCP increment (2/2) for 'foo.com(127.0.0.1:1970)'
(upstream#/apisix/routes/1) unhealthy TCP increment (1/2) for 'foo.com(127.0.0.1:1970)'
(upstream#/apisix/routes/1) unhealthy TCP increment (2/2) for 'foo.com(127.0.0.1:1970)'
--- timeout: 10


Expand Down Expand Up @@ -795,3 +795,117 @@ GET /t
qr/expected 65536 to be smaller than 65535/
--- error_code chomp
400



=== TEST 18: set route + upstream (two upstream node: one healthy + one unhealthy)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/upstreams/1',
ngx.HTTP_PUT,
[[{
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1,
"127.0.0.1:1970": 1
},
"checks": {
"active": {
"http_path": "/status",
"host": "foo.com",
"healthy": {
"interval": 1,
"successes": 1
},
"unhealthy": {
"interval": 1,
"http_failures": 2
}
}
}
}]]
)

if code >= 300 then
ngx.status = code
ngx.say(body)
return
end

local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"uri": "/server_port",
"upstream_id": 1
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- grep_error_log eval
qr/^.*?\[error\](?!.*process exiting).*/
--- grep_error_log_out



=== TEST 19: hit routes, ensure the checker is bound to the upstream
--- config
location /t {
content_by_lua_block {
local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port
.. "/server_port"

do
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
end

ngx.sleep(2.5)

local ports_count = {}
for i = 1, 12 do
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
if not res then
ngx.say(err)
return
end

ports_count[res.body] = (ports_count[res.body] or 0) + 1
end

local ports_arr = {}
for port, count in pairs(ports_count) do
table.insert(ports_arr, {port = port, count = count})
end

local function cmd(a, b)
return a.port > b.port
end
table.sort(ports_arr, cmd)

ngx.say(require("toolkit.json").encode(ports_arr))
ngx.exit(200)
}
}
--- request
GET /t
--- response_body
[{"count":12,"port":"1980"}]
--- grep_error_log eval
qr/\([^)]+\) unhealthy .* for '.*'/
--- grep_error_log_out
(upstream#/apisix/upstreams/1) unhealthy TCP increment (1/2) for 'foo.com(127.0.0.1:1970)'
(upstream#/apisix/upstreams/1) unhealthy TCP increment (2/2) for 'foo.com(127.0.0.1:1970)'
--- timeout: 10
Loading

0 comments on commit bbbdf58

Please sign in to comment.