Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pluginserver): reset instance triggers invalidation #11819

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions kong/runloop/plugin_servers/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ local exposed_api = {

local get_instance_id
local reset_instance
local reset_instances_for_plugin

local protocol_implementations = {
["MsgPack:1"] = "kong.runloop.plugin_servers.mp_rpc",
Expand Down Expand Up @@ -204,14 +205,15 @@ function get_instance_id(plugin_name, conf)
-- to prevent a potential dead loop when someone failed to release the ID
wait_count = wait_count + 1
if wait_count > MAX_WAIT_STEPS then
running_instances[key] = nil
return nil, "Could not claim instance_id for " .. plugin_name .. " (key: " .. key .. ")"
end
instance_info = running_instances[key]
end

if instance_info
and instance_info.id
and instance_info.seq == conf.__seq__
and instance_info.conf and instance_info.conf.__key__ == key
then
-- exact match, return it
return instance_info.id
Expand All @@ -222,7 +224,6 @@ function get_instance_id(plugin_name, conf)
-- we're the first, put something to claim
instance_info = {
conf = conf,
seq = conf.__seq__,
}
running_instances[key] = instance_info
else
Expand All @@ -243,8 +244,8 @@ function get_instance_id(plugin_name, conf)
end

instance_info.id = new_instance_info.id
instance_info.plugin_name = plugin_name
instance_info.conf = new_instance_info.conf
instance_info.seq = new_instance_info.seq
instance_info.Config = new_instance_info.Config
instance_info.rpc = new_instance_info.rpc

Expand All @@ -257,22 +258,28 @@ function get_instance_id(plugin_name, conf)
return instance_info.id
end

function reset_instances_for_plugin(plugin_name)
for k, instance in pairs(running_instances) do
if instance.plugin_name == plugin_name then
running_instances[k] = nil
end
end
end

--- reset_instance: removes an instance from the table.
function reset_instance(plugin_name, conf)
local key = type(conf) == "table" and kong.plugin.get_id() or plugin_name
local current_instance = running_instances[key]

--
-- the same plugin (which acts as a plugin server) is shared among
-- instances of the plugin; for example, the same plugin can be applied
-- to many routes
-- `reset_instance` is called when (but not only) the plugin server died;
-- in such case, all associated instances must be removed, not only the current
--
for k, instance in pairs(running_instances) do
if instance.rpc == current_instance.rpc then
running_instances[k] = nil
end
reset_instances_for_plugin(plugin_name)

local ok, err = kong.worker_events.post("plugin_server", "reset_instances", { plugin_name = plugin_name })
if not ok then
kong.log.err("failed to post plugin_server reset_instances event: ", err)
end
end

Expand Down Expand Up @@ -390,7 +397,7 @@ function plugin_servers.start()

-- in case plugin server restarts, all workers need to update their defs
kong.worker_events.register(function (data)
reset_instance(data.plugin_name, data.conf)
reset_instances_for_plugin(data.plugin_name)
end, "plugin_server", "reset_instances")
end

Expand Down
26 changes: 15 additions & 11 deletions kong/runloop/plugin_servers/pb_rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ function Rpc:call_start_instance(plugin_name, conf)
return nil, err
end

kong.log.debug("started plugin server: seq ", conf.__seq__, ", worker ", ngx.worker.id(), ", instance id ",
status.instance_status.instance_id)

return {
id = status.instance_status.instance_id,
conf = conf,
Expand Down Expand Up @@ -399,19 +402,20 @@ function Rpc:handle_event(plugin_name, conf, phase)
end

if not res or res == "" then
local ok, err2 = kong.worker_events.post("plugin_server", "reset_instances",
{ plugin_name = plugin_name, conf = conf })
if not ok then
kong.log.err("failed to post plugin_server reset_instances event: ", err2)
end
if err then
local err_lowered = err and err:lower() or ""

local err_lowered = err and err:lower() or ""
if str_find(err_lowered, "no plugin instance")
or str_find(err_lowered, "closed") then
kong.log.warn(err)
return self:handle_event(plugin_name, conf, phase)
kong.log.err(err_lowered)

if err_lowered == "not ready" then
self.reset_instance(plugin_name, conf)
end
if str_find(err_lowered, "no plugin instance")
Comment on lines +410 to +413
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to exit when err_lowered appears here?

or str_find(err_lowered, "closed") then
self.reset_instance(plugin_name, conf)
return self:handle_event(plugin_name, conf, phase)
end
end
kong.log.err(err)
end
end

Expand Down
10 changes: 7 additions & 3 deletions kong/runloop/plugins_iterator.lua
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ do
end


local NEXT_SEQ = 0
local PLUGINS_NS = "plugins." .. subsystem
local ENABLED_PLUGINS
local LOADED_PLUGINS
Expand Down Expand Up @@ -170,8 +169,13 @@ local function get_plugin_config(plugin, name, ws_id)
-- TODO: deprecate usage of __key__ as id of plugin
if not cfg.__key__ then
cfg.__key__ = key
cfg.__seq__ = NEXT_SEQ
NEXT_SEQ = NEXT_SEQ + 1
-- generate a unique sequence across workers
-- with a seq 0, plugin server generates an unused random instance id
local next_seq, err = ngx.shared.kong:incr("plugins_iterator:__seq__", 1, 0, 0)
if err then
next_seq = 0
end
cfg.__seq__ = next_seq
end

return cfg
Expand Down