Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support send error-log to kafka brokers #8693

Merged
merged 8 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion apisix/plugins/error-log-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local core = require("apisix.core")
local errlog = require("ngx.errlog")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local producer = require ("resty.kafka.producer")
local timers = require("apisix.timers")
local http = require("resty.http")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's group the resty module together.

local plugin_name = "error-log-logger"
Expand Down Expand Up @@ -66,6 +67,62 @@ local metadata_schema = {
},
required = {"endpoint_addr", "user", "password", "database", "logtable"}
},
kafka = {
type = "object",
properties = {
brokers = {
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
host = {
type = "string",
description = "the host of kafka broker",
},
port = {
type = "integer",
minimum = 1,
maximum = 65535,
description = "the port of kafka broker",
},
sasl_config = {
type = "object",
description = "sasl config",
properties = {
mechanism = {
type = "string",
default = "PLAIN",
enum = {"PLAIN"},
},
user = { type = "string", description = "user" },
password = { type = "string", description = "password" },
},
required = {"user", "password"},
},
},
required = {"host", "port"},
},
uniqueItems = true,
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
default = "async",
enum = {"async", "sync"},
},
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
},
key = {type = "string"},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
},
required = {"brokers", "kafka_topic"},
},
name = {type = "string", default = plugin_name},
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT",
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}},
Expand All @@ -81,6 +138,7 @@ local metadata_schema = {
{required = {"skywalking"}},
{required = {"tcp"}},
{required = {"clickhouse"}},
{required = {"kafka"}},
-- for compatible with old schema
{required = {"host", "port"}}
},
Expand Down Expand Up @@ -272,6 +330,55 @@ local function send_to_clickhouse(log_message)
end


local function create_producer(broker_list, broker_config, cluster_name)
core.log.info("create new kafka producer instance")
return producer:new(broker_list, broker_config, cluster_name)
end


local function send_to_kafka(log_message)
core.log.info("sending a batch logs to kafka brokers: ", core.json.encode(config.kafka.brokers))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use delay_encode?


local broker_config = {}
broker_config["request_timeout"] = config.timeout * 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the config from? This function doesn't have an argument called config.

Copy link
Contributor Author

@ronething ronething Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it comes from

timeout = {type = "integer", minimum = 1, default = 3},

local config = {}

and set by
config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)

broker_config["producer_type"] = config.kafka.producer_type
broker_config["required_acks"] = config.kafka.required_acks

local prod
local err

local metadata = plugin.plugin_metadata(plugin_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems using the config passed from process here might create a race:

Consider we have c1(config) and m1 (modifiedIndex) in process, and c2/m2 in send. It looks like we might use m2 as key and c1 as value in the cache below.

Copy link
Contributor Author

@ronething ronething Jan 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander

It looks like we might use m2 as key and c1 as value in the cache below.

Do you mean that we need to clone config before we use it like below? then we can use m2 and c2 in send.

local config = core.table.clone(config)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A clone of c1 still has a c1's value.
Maybe we can get the c2 in send like what we have done in process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will change the code and update the PR later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander Could you please take a look, thanks.

if not (metadata and metadata.value and metadata.modifiedIndex) then
core.log.info("please set the correct plugin_metadata for ", plugin_name)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should return a boolean here?

else
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use end here, no need to nest the code

-- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use a separate lrucache to cache different data.

create_producer, config.kafka.brokers, broker_config,
config.kafka.cluster_name)
if not prod then
return false, "get kafka producer failed " .. err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return false, "get kafka producer failed " .. err
return false, "get kafka producer failed: " .. err

end
core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ",
prod.client.broker_list[1].port)
end


local ok
for i = 1, #log_message, 2 do
ok, err = prod:send(config.kafka.kafka_topic,
config.kafka.key, core.json.encode(log_message[i]))
if not ok then
return false, "failed to send data to Kafka topic: " .. err ..
", brokers: " .. core.json.encode(config.kafka.brokers)
end
core.log.info("send data to kafka: ", core.json.encode(log_message[i]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use delay_encode?

end

return true
end


local function update_filter(value)
local level = log_level[value.level]
local status, err = errlog.set_filter_level(level)
Expand All @@ -290,6 +397,8 @@ local function send(data)
return send_to_skywalking(data)
elseif config.clickhouse then
return send_to_clickhouse(data)
elseif config.kafka then
return send_to_kafka(data)
end
return send_to_tcp_server(data)
end
Expand All @@ -307,7 +416,7 @@ local function process()
core.log.warn("set log filter failed for ", err)
return
end
if not (config.tcp or config.skywalking or config.clickhouse) then
if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then
config.tcp = {
host = config.host,
port = config.port,
Expand Down
35 changes: 34 additions & 1 deletion docs/en/latest/plugins/error-log-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ description: This document contains information about the Apache APISIX error-lo

## Description

The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), or ClickHouse servers. You can also set the error log level to send the logs to server.
The `error-log-logger` Plugin is used to push APISIX's error logs (`error.log`) to TCP, [Apache SkyWalking](https://skywalking.apache.org/), Apache Kafka or ClickHouse servers. You can also set the error log level to send the logs to server.

It might take some time to receive the log data. It will be automatically sent after the timer function in the [batch processor](../batch-processor.md) expires.

Expand All @@ -48,6 +48,17 @@ It might take some time to receive the log data. It will be automatically sent a
| clickhouse.password | String | False | | | ClickHouse password. |
| clickhouse.database | String | False | | | Name of the database to store the logs. |
| clickhouse.logtable | String | False | | | Table name to store the logs. |
| kafka.brokers | array | True | | | List of Kafka brokers (nodes). |
| kafka.brokers.host | string | True | | | The host of Kafka broker, e.g, `192.168.1.1`. |
| kafka.brokers.port | integer | True | | [0, 65535] | The port of Kafka broker |
| kafka.brokers.sasl_config | object | False | | | The sasl config of Kafka broker |
| kafka.brokers.sasl_config.mechanism | string | False | "PLAIN" | ["PLAIN"] | The mechaism of sasl config |
| kafka.brokers.sasl_config.user | string | True | | | The user of sasl_config. If sasl_config exists, it's required. |
| kafka.brokers.sasl_config.password | string | True | | | The password of sasl_config. If sasl_config exists, it's required. |
| kafka.kafka_topic | string | True | | | Target topic to push the logs for organisation. |
| kafka.producer_type | string | False | async | ["async", "sync"] | Message sending mode of the producer. |
| kafka.required_acks | integer | False | 1 | [0, 1, -1] | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
| kafka.key | string | False | | | Key used for allocating partitions for messages. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster_name is not documented?

| timeout | integer | False | 3 | [1,...] | Timeout (in seconds) for the upstream to connect and send data. |
| keepalive | integer | False | 30 | [1,...] | Time in seconds to keep the connection alive after sending data. |
| level | string | False | WARN | ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"] | Log level to filter the error logs. `ERR` is same as `ERROR`. |
Expand Down Expand Up @@ -118,6 +129,28 @@ curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger -H 'X-A
}'
```

### Configuring Kafka server

The Plugin sends the error log to Kafka, you can configure it as shown below:

```shell
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"kafka":{
"brokers":[
{
"host":"127.0.0.1",
"port":9092
}
],
"kafka_topic":"test2"
},
"level":"ERROR",
"inactive_timeout":1
}'
```

## Disable Plugin

To disable the Plugin, you can remove it from your configuration file (`conf/config.yaml`):
Expand Down
37 changes: 35 additions & 2 deletions docs/zh/latest/plugins/error-log-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ keywords:
- API 网关
- 错误日志
- Plugin
description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX 的错误日志推送到 TCP、Apache SkyWalking 或 ClickHouse 服务器。
description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX 的错误日志推送到 TCP、Apache SkyWalking、Apache Kafka 或 ClickHouse 服务器。
---

<!--
Expand All @@ -29,7 +29,7 @@ description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX

## 描述

`error-log-logger` 插件用于将 APISIX 的错误日志 (`error.log`) 推送到 TCP、Apache SkyWalking 或 ClickHouse 服务器,你还可以设置错误日志级别以将日志发送到服务器。
`error-log-logger` 插件用于将 APISIX 的错误日志 (`error.log`) 推送到 TCP、Apache SkyWalking、Apache Kafka 或 ClickHouse 服务器,你还可以设置错误日志级别以将日志发送到服务器。

## 属性

Expand All @@ -47,6 +47,17 @@ description: API 网关 Apache APISIX error-log-logger 插件用于将 APISIX
| clickhouse.password | String | 否 | | | ClickHouse 的密码。 |
| clickhouse.database | String | 否 | | | ClickHouse 的用于接收日志的数据库。 |
| clickhouse.logtable | String | 否 | | | ClickHouse 的用于接收日志的表。 |
| kafka.brokers | array | 是 | | | 需要推送的 Kafka broker 列表。 |
| kafka.brokers.host | string | 是 | | | Kafka broker 的节点 host 配置,例如 `192.168.1.1`|
| kafka.brokers.port | string | 是 | | | Kafka broker 的节点端口配置 |
| kafka.brokers.sasl_config | object | 否 | | | Kafka broker 中的 sasl_config |
| kafka.brokers.sasl_config.mechanism | string | 否 | "PLAIN" | ["PLAIN"] | Kafka broker 中的 sasl 认证机制 |
| kafka.brokers.sasl_config.user | string | 是 | | | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 |
| kafka.brokers.sasl_config.password | string | 是 | | | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 |
| kafka.kafka_topic | string | 是 | | | 需要推送的 Kafka topic。|
| kafka.producer_type | string | 否 | async | ["async", "sync"] | 生产者发送消息的模式。|
| kafka.required_acks | integer | 否 | 1 | [0, 1, -1] | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。 |
| kafka.key | string | 否 | | | 用于消息分区而分配的密钥。 |
| timeout | integer | 否 | 3 | [1,...] | 连接和发送数据超时间,以秒为单位。 |
| keepalive | integer | 否 | 30 | [1,...] | 复用连接时,连接保持的时间,以秒为单位。 |
| level | string | 否 | WARN | | 进行错误日志筛选的级别,默认为 `WARN`,取值 ["STDERR", "EMERG", "ALERT", "CRIT", "ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"],其中 `ERR` 与 `ERROR` 级别一致。 |
Expand Down Expand Up @@ -127,6 +138,28 @@ curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \
}'
```

### 配置 Kafka

该插件支持将错误日志发送到 Kafka,你可以按照如下方式进行配置:

```shell
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/error-log-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"kafka":{
"brokers":[
{
"host":"127.0.0.1",
"port":9092
}
],
"kafka_topic":"test2"
},
"level":"ERROR",
"inactive_timeout":1
}'
```

## 禁用插件

当你不再需要该插件时,只需要在 `./conf/config.yaml` 中删除或注释该插件即可。
Expand Down
Loading