Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support hook response body for ext-plugin #6968

Merged
merged 23 commits into from
May 10, 2022
1 change: 1 addition & 0 deletions apisix/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ return {
RPC_PREPARE_CONF = 1,
RPC_HTTP_REQ_CALL = 2,
RPC_EXTRA_INFO = 3,
RPC_HTTP_RESP_CALL = 4,
HTTP_ETCD_DIRECTORY = {
["/upstreams"] = true,
["/plugins"] = true,
Expand Down
172 changes: 172 additions & 0 deletions apisix/plugins/ext-plugin-post-resp.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local ext = require("apisix.plugins.ext-plugin.init")
local constants = require("apisix.constants")
local http = require("resty.http")

local ngx = ngx
local ngx_print = ngx.print
local ngx_flush = ngx.flush
local string = string
local str_sub = string.sub


local name = "ext-plugin-post-resp"
local _M = {
version = 0.1,
priority = -4000,
name = name,
schema = ext.schema,
}


local function include_req_headers(ctx)
-- TODO: handle proxy_set_header
return core.request.headers(ctx)
end


local function close(http_obj)
-- TODO: keepalive
local ok, err = http_obj:close()
if not ok then
core.log.error("close http object failed: ", err)
end
end


local function get_response(ctx, http_obj)
local ok, err = http_obj:connect({
scheme = ctx.upstream_scheme,
host = ctx.picked_server.host,
port = ctx.picked_server.port,
})

if not ok then
return nil, err
end
-- TODO: set timeout
local uri, args
if ctx.var.upstream_uri == "" then
-- use original uri instead of rewritten one
uri = ctx.var.uri
else
uri = ctx.var.upstream_uri

-- the rewritten one may contain new args
local index = core.string.find(uri, "?")
if index then
local raw_uri = uri
uri = str_sub(raw_uri, 1, index - 1)
args = str_sub(raw_uri, index + 1)
end
end
local params = {
path = uri,
query = args or ctx.var.args,
headers = include_req_headers(ctx),
method = core.request.get_method(),
}

local body, err = core.request.get_body()
if err then
return nil, err
end

if body then
params["body"] = body
end

local res, err = http_obj:request(params)
if not res then
return nil, err
end

return res, err
end


local function send_response(res, code)
ngx.status = code or res.status

local reader = res.body_reader
repeat
local chunk, ok, read_err, print_err, flush_err
-- TODO: HEAD or 304
chunk, read_err = reader()
if read_err then
return "read response failed: ".. (read_err or "")
end

if chunk then
ok, print_err = ngx_print(chunk)
if not ok then
return "output response failed: ".. (print_err or "")
end
ok, flush_err = ngx_flush(true)
if not ok then
core.log.warn("flush response failed: ", flush_err)
end
end
until not chunk

return nil
end



function _M.check_schema(conf)
return core.schema.check(_M.schema, conf)
end


function _M.before_proxy(conf, ctx)
local http_obj = http.new()
local res, err = get_response(ctx, http_obj)
if not res or err then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 100 has judgment that res is nil?

core.log.error("failed to request: ", err or "")
close(http_obj)
return 502
end
ctx.runner_ext_response = res

core.log.info("response info, status: ", res.status)
core.log.info("response info, headers: ", core.json.delay_encode(res.headers))

local code, body = ext.communicate(conf, ctx, name, constants.RPC_HTTP_RESP_CALL)
if body then
close(http_obj)
-- if the body is changed, the code will be set.
return code, body
end
core.log.info("ext-plugin will send response")

-- send origin response, status maybe changed.
err = send_response(res, code)
close(http_obj)

if err then
core.log.error(err)
return not ngx.headers_sent and 502 or nil
end

core.log.info("ext-plugin send response succefully")
end


return _M
110 changes: 106 additions & 4 deletions apisix/plugins/ext-plugin/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ local http_req_call_resp = require("A6.HTTPReqCall.Resp")
local http_req_call_action = require("A6.HTTPReqCall.Action")
local http_req_call_stop = require("A6.HTTPReqCall.Stop")
local http_req_call_rewrite = require("A6.HTTPReqCall.Rewrite")
local http_resp_call_req = require("A6.HTTPRespCall.Req")
local http_resp_call_resp = require("A6.HTTPRespCall.Resp")
local extra_info = require("A6.ExtraInfo.Info")
local extra_info_req = require("A6.ExtraInfo.Req")
local extra_info_var = require("A6.ExtraInfo.Var")
Expand Down Expand Up @@ -605,8 +607,6 @@ local rpc_handlers = {
end

if action_type == http_req_call_action.Rewrite then
ctx.request_rewritten = constants.REWRITTEN_BY_EXT_PLUGIN

local action = call_resp:Action()
local rewrite = http_req_call_rewrite.New()
rewrite:Init(action.bytes, action.pos)
Expand Down Expand Up @@ -682,6 +682,107 @@ local rpc_handlers = {

return true
end,
nil, -- ignore RPC_EXTRA_INFO, already processed during RPC_HTTP_REQ_CALL interaction
function (conf, ctx, sock, entry)
local lrucache_id = core.lrucache.plugin_ctx_id(ctx, entry)
local token, err = core.lrucache.plugin_ctx(lrucache, ctx, entry, rpc_call,
constants.RPC_PREPARE_CONF, conf, ctx,
lrucache_id)
if not token then
return nil, err
end

builder:Clear()
local var = ctx.var

local res = ctx.runner_ext_response
local textEntries = {}
local hdrs = res.headers
for key, val in pairs(hdrs) do
local ty = type(val)
if ty == "table" then
for _, v in ipairs(val) do
core.table.insert(textEntries, build_headers(var, builder, key, v))
end
else
core.table.insert(textEntries, build_headers(var, builder, key, val))
end
end
local len = #textEntries
http_resp_call_req.StartHeadersVector(builder, len)
for i = len, 1, -1 do
builder:PrependUOffsetTRelative(textEntries[i])
end
local hdrs_vec = builder:EndVector(len)

local id = generate_id()
local status = res.status

http_resp_call_req.Start(builder)
http_resp_call_req.AddId(builder, id)
http_resp_call_req.AddStatus(builder, status)
http_resp_call_req.AddConfToken(builder, token)
http_resp_call_req.AddHeaders(builder, hdrs_vec)

local req = http_resp_call_req.End(builder)
builder:Finish(req)

local ok, err = send(sock, constants.RPC_HTTP_RESP_CALL, builder:Output())
if not ok then
return nil, "failed to send RPC_HTTP_RESP_CALL: " .. err
end

local ty, resp = receive(sock)
if ty == nil then
return nil, "failed to receive RPC_HTTP_RESP_CALL: " .. resp
end

if ty ~= constants.RPC_HTTP_RESP_CALL then
return nil, "failed to receive RPC_HTTP_RESP_CALL: unexpected type " .. ty
end

local buf = flatbuffers.binaryArray.New(resp)
local call_resp = http_resp_call_resp.GetRootAsResp(buf, 0)
local len = call_resp:HeadersLength()
if len > 0 then
local resp_headers = {}
for i = 1, len do
local entry = call_resp:Headers(i)
local name = str_lower(entry:Name())
if not exclude_resp_header[name] then
if resp_headers[name] == nil then
core.response.set_header(name, entry:Value())
resp_headers[name] = true
else
core.response.add_header(name, entry:Value())
end
end
end
else
-- Filter out origin headeres
for k, v in pairs(res.headers) do
if not exclude_resp_header[str_lower(k)] then
core.response.set_header(k, v)
end
end
end

local body
local len = call_resp:BodyLength()
if len > 0 then
-- TODO: support empty body
body = call_resp:BodyAsString()
end
local code = call_resp:Status()
core.log.info("recv resp, code: ", code, " body: ", body, " len: ", len)

if code == 0 then
-- runner changes body only, we should set code.
code = body and res.status or nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like it's covered in ngx.status = code or res.status?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping @soulbird

end

return true, nil, code, body
end
}


Expand Down Expand Up @@ -721,12 +822,13 @@ local function recreate_lrucache()
end


function _M.communicate(conf, ctx, plugin_name)
function _M.communicate(conf, ctx, plugin_name, rpc_cmd)
local ok, err, code, body
local tries = 0
local ty = rpc_cmd and rpc_cmd or constants.RPC_HTTP_REQ_CALL
while tries < 3 do
tries = tries + 1
ok, err, code, body = rpc_call(constants.RPC_HTTP_REQ_CALL, conf, ctx, plugin_name)
ok, err, code, body = rpc_call(ty, conf, ctx, plugin_name)
if ok then
if code then
return code, body
Expand Down
1 change: 1 addition & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ plugins: # plugin list (sorted by priority)
- openwhisk # priority: -1901
- serverless-post-function # priority: -2000
- ext-plugin-post-req # priority: -3000
- ext-plugin-post-resp # priority: -4000

stream_plugins: # sorted by priority
- ip-restriction # priority: 3000
Expand Down
2 changes: 1 addition & 1 deletion rockspec/apisix-master-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ dependencies = {
"luasec = 0.9-1",
"lua-resty-consul = 0.3-2",
"penlight = 1.9.2-1",
"ext-plugin-proto = 0.4.0",
"ext-plugin-proto = 0.5.0",
"casbin = 1.26.0",
"api7-snowflake = 2.0-1",
"inspect == 3.1.1",
Expand Down
1 change: 1 addition & 0 deletions t/admin/plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ azure-functions
openwhisk
serverless-post-function
ext-plugin-post-req
ext-plugin-post-resp



Expand Down
Loading