diff --git a/apisix/plugins/error-log-logger.lua b/apisix/plugins/error-log-logger.lua index d847ca1ce852..f2028d4f5269 100644 --- a/apisix/plugins/error-log-logger.lua +++ b/apisix/plugins/error-log-logger.lua @@ -21,6 +21,7 @@ local batch_processor = require("apisix.utils.batch-processor") local plugin = require("apisix.plugin") local timers = require("apisix.timers") local http = require("resty.http") +local producer = require("resty.kafka.producer") local plugin_name = "error-log-logger" local table = core.table local schema_def = core.schema @@ -32,6 +33,9 @@ local string = require("string") local lrucache = core.lrucache.new({ ttl = 300, count = 32 }) +local kafka_prod_lrucache = core.lrucache.new({ + ttl = 300, count = 32 +}) local metadata_schema = { @@ -66,6 +70,62 @@ local metadata_schema = { }, required = {"endpoint_addr", "user", "password", "database", "logtable"} }, + kafka = { + type = "object", + properties = { + brokers = { + type = "array", + minItems = 1, + items = { + type = "object", + properties = { + host = { + type = "string", + description = "the host of kafka broker", + }, + port = { + type = "integer", + minimum = 1, + maximum = 65535, + description = "the port of kafka broker", + }, + sasl_config = { + type = "object", + description = "sasl config", + properties = { + mechanism = { + type = "string", + default = "PLAIN", + enum = {"PLAIN"}, + }, + user = { type = "string", description = "user" }, + password = { type = "string", description = "password" }, + }, + required = {"user", "password"}, + }, + }, + required = {"host", "port"}, + }, + uniqueItems = true, + }, + kafka_topic = {type = "string"}, + producer_type = { + type = "string", + default = "async", + enum = {"async", "sync"}, + }, + required_acks = { + type = "integer", + default = 1, + enum = { 0, 1, -1 }, + }, + key = {type = "string"}, + -- in lua-resty-kafka, cluster_name is defined as number + -- see https://github.com/doujiang24/lua-resty-kafka#new-1 + cluster_name = {type = "integer", minimum = 1, default = 1}, + }, + required = {"brokers", "kafka_topic"}, + }, name = {type = "string", default = plugin_name}, level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}}, @@ -81,6 +141,7 @@ local metadata_schema = { {required = {"skywalking"}}, {required = {"tcp"}}, {required = {"clickhouse"}}, + {required = {"kafka"}}, -- for compatible with old schema {required = {"host", "port"}} }, @@ -285,11 +346,63 @@ local function update_filter(value) end +local function create_producer(broker_list, broker_config, cluster_name) + core.log.info("create new kafka producer instance") + return producer:new(broker_list, broker_config, cluster_name) +end + + +local function send_to_kafka(log_message) + -- avoid race of the global config + local metadata = plugin.plugin_metadata(plugin_name) + if not (metadata and metadata.value and metadata.modifiedIndex) then + return false, "please set the correct plugin_metadata for " .. plugin_name + end + local config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value) + if not config then + return false, "get config failed: " .. err + end + + core.log.info("sending a batch logs to kafka brokers: ", + core.json.delay_encode(config.kafka.brokers)) + + local broker_config = {} + broker_config["request_timeout"] = config.timeout * 1000 + broker_config["producer_type"] = config.kafka.producer_type + broker_config["required_acks"] = config.kafka.required_acks + + -- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka + local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex, + create_producer, config.kafka.brokers, broker_config, + config.kafka.cluster_name) + if not prod then + return false, "get kafka producer failed: " .. err + end + core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ", + prod.client.broker_list[1].port) + + local ok + for i = 1, #log_message, 2 do + ok, err = prod:send(config.kafka.kafka_topic, + config.kafka.key, core.json.encode(log_message[i])) + if not ok then + return false, "failed to send data to Kafka topic: " .. err .. + ", brokers: " .. core.json.encode(config.kafka.brokers) + end + core.log.info("send data to kafka: ", core.json.delay_encode(log_message[i])) + end + + return true +end + + local function send(data) if config.skywalking then return send_to_skywalking(data) elseif config.clickhouse then return send_to_clickhouse(data) + elseif config.kafka then + return send_to_kafka(data) end return send_to_tcp_server(data) end @@ -307,7 +420,7 @@ local function process() core.log.warn("set log filter failed for ", err) return end - if not (config.tcp or config.skywalking or config.clickhouse) then + if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then config.tcp = { host = config.host, port = config.port, diff --git a/docs/en/latest/plugins/error-log-logger.md b/docs/en/latest/plugins/error-log-logger.md index 63b1be1c727b..6e4db909331d 100644 --- a/docs/en/latest/plugins/error-log-logger.md +++ b/docs/en/latest/plugins/error-log-logger.md @@ -28,7 +28,7 @@ description: This document contains information about the Apache APISIX error-lo ## Description -The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), or ClickHouse servers. You can also set the error log level to send the logs to server. +The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), Apache Kafka or ClickHouse servers. You can also set the error log level to send the logs to server. It might take some time to receive the log data. It will be automatically sent after the timer function in the [batch processor](../batch-processor.md) expires. @@ -48,6 +48,18 @@ It might take some time to receive the log data. It will be automatically sent a | clickhouse.password | String | False | | | ClickHouse password. | | clickhouse.database | String | False | | | Name of the database to store the logs. | | clickhouse.logtable | String | False | | | Table name to store the logs. | +| kafka.brokers | array | True | | | List of Kafka brokers (nodes). | +| kafka.brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. | +| kafka.brokers.port | integer | True | | [0, 65535] | The port of Kafka broker | +| kafka.brokers.sasl_config | object | False | | | The sasl config of Kafka broker | +| kafka.brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config | +| kafka.brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. | +| kafka.brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. | +| kafka.kafka_topic | string | True | | | Target topic to push the logs for organisation. | +| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. | +| kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. | +| kafka.key | string | False | | | Key used for allocating partitions for messages. | +| kafka.cluster_name | integer | False | 1 | [0,...] | Name of the cluster. Used when there are two or more Kafka clusters. Only works if the `producer_type` attribute is set to `async`. | | timeout | integer | False | 3 | [1,...] | Timeout (in seconds) for the upstream to connect and send data. | | keepalive | integer | False | 30 | [1,...] | Time in seconds to keep the connection alive after sending data. | | level | string | False | WARN | ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"] | Log level to filter the error logs. `ERR` is same as `ERROR`. | @@ -118,6 +130,28 @@ curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger -H 'X-A }' ``` +### Configuring Kafka server + +The Plugin sends the error log to Kafka, you can configure it as shown below: + +```shell +curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \ +-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "kafka":{ + "brokers":[ + { + "host":"127.0.0.1", + "port":9092 + } + ], + "kafka_topic":"test2" + }, + "level":"ERROR", + "inactive_timeout":1 +}' +``` + ## Disable Plugin To disable the Plugin, you can remove it from your configuration file (`conf/config.yaml`): diff --git a/docs/zh/latest/plugins/error-log-logger.md b/docs/zh/latest/plugins/error-log-logger.md index daee4c521f5a..d0e5af184526 100644 --- a/docs/zh/latest/plugins/error-log-logger.md +++ b/docs/zh/latest/plugins/error-log-logger.md @@ -5,7 +5,7 @@ keywords: - API 网关 - 错误日志 - Plugin -description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX 的错误日志推送到 TCP、Apache SkyWalking 或 ClickHouse 服务器。 +description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX 的错误日志推送到 TCP、Apache SkyWalking、Apache Kafka 或 ClickHouse 服务器。 ---