Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add batch request plugin. #1388

Merged
merged 21 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions apisix/http/aggregate.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local http = require("resty.http")
local ngx = ngx
local ipairs = ipairs
local pairs = pairs
local type = type

local _M = {
version = 0.1,
}

local function check_input(data)
if not data.pipeline then
return 400, {message = "missing 'pipeline' in input"}
end
local type_timeout = type(data.timeout)
if type_timeout ~= "number" and type_timeout ~= "nil" then
return 400, {message = "'timeout' should be number"}
end
if not data.timeout or data.timeout == 0 then
data.timeout = 30000
end
end

local function set_base_header(data)
if not data.headers then
return
end

for i,req in ipairs(data.pipeline) do
if not req.headers then
req.headers = data.headers
else
for k, v in pairs(data.headers) do
if not req.headers[k] then
req.headers[k] = v
end
end
end
end
end

local function set_base_query(data)
if not data.query then
return
end

for i,req in ipairs(data.pipeline) do
if not req.query then
req.query = data.query
else
for k, v in pairs(data.query) do
if not req.query[k] then
req.query[k] = v
end
end
end
end
end

function _M.aggregate(service_id)
ngx.req.read_body()
local req_body = ngx.req.get_body_data()
local data, err = core.json.decode(req_body)
local code, body = check_input(data)
if not data then
core.log.error("invalid request body: ", req_body, " err: ", err)
return 400, {message = "invalid request body", req_body = req_body}
end

if code then
return code, body
end

local httpc = http.new()
core.log.info(data.timeout)
httpc:set_timeout(data.timeout)
httpc:connect("127.0.0.1", ngx.var.server_port)
set_base_header(data)
set_base_query(data)
local responses, err = httpc:request_pipeline(data.pipeline)
if not responses then
return 400, {message = "request failed", err = err}
end

local aggregated_resp = {}
for i,r in ipairs(responses) do
if not r.status then
core.table.insert(aggregated_resp, {
status = 504,
reason = "target timeout"
})
end
local sub_resp = {
status = r.status,
reason = r.reason,
headers = r.headers,
}
if r.has_body then
sub_resp.body = r:read_body()
end
core.table.insert(aggregated_resp, sub_resp)
end
return 200, aggregated_resp
end

return _M
5 changes: 5 additions & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
local require = require
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local aggregate = require("apisix.http.aggregate").aggregate
local service_fetch = require("apisix.http.service").get
local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
Expand Down Expand Up @@ -630,5 +631,9 @@ function _M.stream_log_phase()
run_plugin("log")
end

function _M.http_api_aggregate()
local code, body = aggregate()
core.response.exit(code, body)
end

return _M
6 changes: 6 additions & 0 deletions bin/apisix
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@ http {
}
{% end %}

location /apisix/aggregate {
content_by_lua_block {
apisix.http_api_aggregate()
}
}

ssl_certificate_by_lua_block {
apisix.http_ssl_phase()
}
Expand Down
6 changes: 6 additions & 0 deletions t/APISIX.pm
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ _EOC_
}
}

location /apisix/aggregate {
content_by_lua_block {
apisix.http_api_aggregate()
}
}

location / {
set \$upstream_mirror_host '';
set \$upstream_scheme 'http';
Expand Down
Loading