Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support config stream_route upstream in service #10298

Merged
merged 7 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions apisix/admin/services.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local get_routes = require("apisix.router").http_routes
local get_stream_routes = require("apisix.router").stream_routes
local apisix_upstream = require("apisix.upstream")
local resource = require("apisix.admin.resource")
local schema_plugin = require("apisix.admin.plugins").check_schema
Expand Down Expand Up @@ -99,6 +100,21 @@ local function delete_checker(id)
end
end

local stream_routes, stream_routes_ver = get_stream_routes()
core.log.info("stream_routes: ", core.json.delay_encode(stream_routes, true))
core.log.info("stream_routes_ver: ", stream_routes_ver)
if stream_routes_ver and stream_routes then
for _, route in ipairs(stream_routes) do
if type(route) == "table" and route.value
and route.value.service_id
and tostring(route.value.service_id) == id then
return 400, {error_msg = "can not delete this service directly,"
.. " stream_route [" .. route.value.id
.. "] is still using it now"}
end
end
end

return nil, nil
end

Expand Down
17 changes: 17 additions & 0 deletions apisix/admin/stream_routes.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ local function check_conf(id, conf, need_id, schema)
end
end

local service_id = conf.service_id
if service_id then
local key = "/services/" .. service_id
local res, err = core.etcd.get(key)
if not res then
return nil, {error_msg = "failed to fetch service info by "
.. "service id [" .. service_id .. "]: "
.. err}
end

if res.status ~= 200 then
return nil, {error_msg = "failed to fetch service info by "
.. "service id [" .. service_id .. "], "
.. "response code: " .. res.status}
end
end

local ok, err = stream_route_checker(conf, true)
if not ok then
return nil, {error_msg = err}
Expand Down
2 changes: 1 addition & 1 deletion apisix/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function _M.init_worker()
filter = filter,
})
if not services then
error("failed to create etcd instance for fetching upstream: " .. err)
error("failed to create etcd instance for fetching /services: " .. err)
return
end
end
Expand Down
29 changes: 29 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ function _M.stream_init_worker()
plugin.init_worker()
xrpc.init_worker()
router.stream_init_worker()
require("apisix.http.service").init_worker()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the service.lua be moved out of the http directory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, obviously service is designed for http

apisix_upstream.init_worker()

local we = require("resty.worker.events")
Expand Down Expand Up @@ -1078,6 +1079,34 @@ function _M.stream_preread_phase()

api_ctx.matched_upstream = upstream

elseif matched_route.value.service_id then
local service = service_fetch(matched_route.value.service_id)
if not service then
core.log.error("failed to fetch service configuration by ",
"id: ", matched_route.value.service_id)
return core.response.exit(404)
end

matched_route = plugin.merge_service_stream_route(service, matched_route)
api_ctx.matched_route = matched_route
api_ctx.conf_type = "stream_route&service"
api_ctx.conf_version = matched_route.modifiedIndex .. "&" .. service.modifiedIndex
api_ctx.conf_id = matched_route.value.id .. "&" .. service.value.id
api_ctx.service_id = service.value.id
api_ctx.service_name = service.value.name
api_ctx.matched_upstream = matched_route.value.upstream
if matched_route.value.upstream_id and not matched_route.value.upstream then
local upstream = apisix_upstream.get_by_id(matched_route.value.upstream_id)
if not upstream then
if is_http then
return core.response.exit(502)
end

return ngx_exit(1)
end

api_ctx.matched_upstream = upstream
end
else
if matched_route.has_domain then
local err
Expand Down
44 changes: 44 additions & 0 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ local stream_local_plugins_hash = core.table.new(0, 32)
local merged_route = core.lrucache.new({
ttl = 300, count = 512
})
local merged_stream_route = core.lrucache.new({
ttl = 300, count = 512
})
local expr_lrucache = core.lrucache.new({
ttl = 300, count = 512
})
Expand Down Expand Up @@ -636,6 +639,47 @@ function _M.merge_service_route(service_conf, route_conf)
service_conf, route_conf)
end

local function merge_service_stream_route(service_conf, route_conf)
-- because many fields in Service are not supported by stream route,
-- so we copy the stream route as base object
local new_conf = core.table.deepcopy(route_conf)
if service_conf.value.plugins then
for name, conf in pairs(service_conf.value.plugins) do
if not new_conf.value.plugins then
new_conf.value.plugins = {}
end

if not new_conf.value.plugins[name] then
new_conf.value.plugins[name] = conf
end
end
end

new_conf.value.service_id = nil

if not new_conf.value.upstream and service_conf.value.upstream then
new_conf.value.upstream = service_conf.value.upstream
end

if not new_conf.value.upstream_id and service_conf.value.upstream_id then
new_conf.value.upstream_id = service_conf.value.upstream_id
end

--core.log.info("merged stream_route conf : ", require("inspect")(new_conf))
return new_conf
end

function _M.merge_service_stream_route(service_conf, route_conf)
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
core.log.info(" stream route conf: ", core.json.delay_encode(route_conf, true))

local version = route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
local route_service_key = route_conf.value.id .. "#"
.. version
return merged_stream_route(route_service_key, version,
merge_service_stream_route,
service_conf, route_conf)
end

local function merge_consumer_route(route_conf, consumer_conf, consumer_group_conf)
if not consumer_conf.plugins or
Expand Down
1 change: 1 addition & 0 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ _M.stream_route = {
},
upstream = upstream_schema,
upstream_id = id_schema,
service_id = id_schema,
plugins = plugins_schema,
protocol = xrpc_protocol_schema,
}
Expand Down
4 changes: 4 additions & 0 deletions apisix/stream/router/ip_port.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ do
for _, route in ipairs(items) do
local hit = match_addrs(route, vars)
if hit then
route.value.remote_addr_matcher = nil
route.value.server_addr_matcher = nil
ctx.matched_route = route
return true
end
Expand Down Expand Up @@ -175,6 +177,8 @@ do
for _, route in ipairs(other_routes) do
local hit = match_addrs(route, api_ctx.var)
if hit then
route.value.remote_addr_matcher = nil
route.value.server_addr_matcher = nil
api_ctx.matched_route = route
return true
end
Expand Down
Loading