diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua index 7a7910222930..4122d1e88dae 100644 --- a/apisix/plugins/http-logger.lua +++ b/apisix/plugins/http-logger.lua @@ -21,20 +21,21 @@ local core = require("apisix.core") local http = require("resty.http") local url = require("net.url") local plugin = require("apisix.plugin") + local ngx = ngx local tostring = tostring local pairs = pairs -local ipairs = ipairs +local ipairs = ipairs local str_byte = string.byte - +local timer_at = ngx.timer.at local plugin_name = "http-logger" +local stale_timer_running = false local buffers = {} local lru_log_format = core.lrucache.new({ ttl = 300, count = 512 }) - local schema = { type = "object", properties = { @@ -92,6 +93,8 @@ local function send_http_data(conf, log_message) local host = url_decoded.host local port = url_decoded.port + core.log.info("sending a batch logs to ", conf.uri) + if ((not port) and url_decoded.scheme == "https") then port = 443 elseif not port then @@ -169,6 +172,23 @@ local function gen_log_format(metadata) end +-- remove stale objects from the memory after timer expires +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in ipairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.warn("removing batch processor stale object, route id:", tostring(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + function _M.log(conf, ctx) local metadata = plugin.plugin_metadata(plugin_name) core.log.info("metadata: ", core.json.delay_encode(metadata)) @@ -199,7 +219,13 @@ function _M.log(conf, ctx) entry.route_id = "no-matched" end - local log_buffer = buffers[entry.route_id] + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + + local log_buffer = buffers[conf] if log_buffer then log_buffer:push(entry) @@ -209,6 +235,7 @@ function _M.log(conf, ctx) -- Generate a function to be executed by the batch processor local func = function(entries, batch_max_size) local data, err + if conf.concat_method == "json" then if batch_max_size == 1 then data, err = core.json.encode(entries[1]) -- encode as single {} @@ -260,7 +287,7 @@ function _M.log(conf, ctx) return end - buffers[entry.route_id] = log_buffer + buffers[conf] = log_buffer log_buffer:push(entry) end diff --git a/apisix/plugins/tcp-logger.lua b/apisix/plugins/tcp-logger.lua index ced5f8f23dad..cf8dc99e2e54 100644 --- a/apisix/plugins/tcp-logger.lua +++ b/apisix/plugins/tcp-logger.lua @@ -23,7 +23,7 @@ local buffers = {} local ngx = ngx local tcp = ngx.socket.tcp local ipairs = ipairs -local stale_timer_running = false; +local stale_timer_running = false local timer_at = ngx.timer.at local schema = { diff --git a/t/plugin/http-logger.t b/t/plugin/http-logger.t index 0b5619fa1cd7..789e1b73d92a 100644 --- a/t/plugin/http-logger.t +++ b/t/plugin/http-logger.t @@ -637,3 +637,150 @@ GET /t done --- no_error_log [error] + + + +=== TEST 17: check plugin configuration updating +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body1 = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }]], + [[{ + "node": { + "value": { + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local code, _, body2 = t("/opentracing", "GET") + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local code, body3 = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello1", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }]], + [[{ + "node": { + "value": { + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello1", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local code, _, body4 = t("/opentracing", "GET") + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + ngx.print(body1) + ngx.print(body2) + ngx.print(body3) + ngx.print(body4) + } + } +--- request +GET /t +--- wait: 0.5 +--- response_body +passedopentracing +passedopentracing +--- grep_error_log eval +qr/sending a batch logs to http:\/\/127.0.0.1:1982\/hello\d?/ +--- grep_error_log_out +sending a batch logs to http://127.0.0.1:1982/hello +sending a batch logs to http://127.0.0.1:1982/hello1