Skip to content

Commit

Permalink
feat(plugins): add new confluent plugin (#9947)
Browse files Browse the repository at this point in the history
* feat(plugins): add new confluent plugin

---------

Co-authored-by: Brent Yarger <[email protected]>
Co-authored-by: samugi <[email protected]>
  • Loading branch information
3 people authored Aug 19, 2024
1 parent c55d4fa commit 45ee373
Show file tree
Hide file tree
Showing 19 changed files with 520 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ plugins/json-threat-protection:
- changed-files:
- any-glob-to-any-file: kong/plugins/json-threat-protection/**/*

plugins/confluent:
- changed-files:
- any-glob-to-any-file: kong/plugins/confluent/**/*

plugins-ee/ai-azure-content-safety:
- changed-files:
- any-glob-to-any-file: plugins-ee/ai-azure-content-safety/**/*
Expand All @@ -382,6 +386,7 @@ plugins-ee/kafka-upstream:
- changed-files:
- any-glob-to-any-file: plugins-ee/kafka-upstream/**/*


plugins-ee/konnect-application-auth:
- changed-files:
- any-glob-to-any-file: plugins-ee/konnect-application-auth/**/*
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,10 @@ jobs:
KONG_SPEC_TEST_GRPCBIN_PORT: "15002"
KONG_SPEC_TEST_GRPCBIN_SSL_PORT: "15003"
KONG_SPEC_TEST_REDIS_STACK_PORT: "16379"
KONG_SPEC_TEST_CONFLUENT_HOST: ${{ secrets.KONG_SPEC_TEST_CONFLUENT_HOST }}
KONG_SPEC_TEST_CONFLUENT_PORT: ${{ secrets.KONG_SPEC_TEST_CONFLUENT_PORT }}
KONG_SPEC_TEST_CONFLUENT_CLUSTER_API_KEY: ${{ secrets.KONG_SPEC_TEST_CONFLUENT_CLUSTER_API_KEY }}
KONG_SPEC_TEST_CONFLUENT_CLUSTER_API_SECRET: ${{ secrets.KONG_SPEC_TEST_CONFLUENT_CLUSTER_API_SECRET }}
KONG_SPEC_TEST_OTELCOL_FILE_EXPORTER_PATH: ${{ github.workspace }}/tmp/otel/file_exporter.json
KONG_SPEC_TEST_OLD_VERSION_KONG_PATH: ${{ github.workspace }}/kong-ee-old
KONG_TEST_LICENSE_DATA: ${{ steps.decrypted_license.outputs.out }}
Expand Down
2 changes: 2 additions & 0 deletions .requirements
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ MSGPACK_C_SHA256=674119f1a85b5f2ecc4c7d5c2859edf50c0b05e0c10aa0df85eefa2c8c14b79
OPENSSL_FIPS_PROVIDER=3.0.9
OPENSSL_FIPS_PROVIDER_SHA256=eb1ab04781474360f77c318ab89d8c5a03abc38e63d65a603cabbf1b00a1dc90


KONGROCKS=v1.2.67


# EE Debugging tools
CURL=8.9.1
CURL_SHA256=291124a007ee5111997825940b3876b3048f7d31e73e9caa681b80fe48b2dcd5
Expand Down
3 changes: 3 additions & 0 deletions changelog/unreleased/kong-ee/add-confluent-plugin.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: "**confluent:** Added the `confluent` plugin which allows to interface with Confluent."
type: feature
scope: Plugin

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: |
Bumped `kong-lua-resty-kafka` to `0.20` to support TCP socket keepalive and allow client_id to be set for the kafka client.
type: dependency
3 changes: 3 additions & 0 deletions kong-3.8.0-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,9 @@ build = {
["kong.plugins.azure-functions.handler"] = "kong/plugins/azure-functions/handler.lua",
["kong.plugins.azure-functions.schema"] = "kong/plugins/azure-functions/schema.lua",

["kong.plugins.confluent.handler"] = "kong/plugins/confluent/handler.lua",
["kong.plugins.confluent.schema"] = "kong/plugins/confluent/schema.lua",

["kong.enterprise_edition.kafka.plugins.producers"] = "kong/enterprise_edition/kafka/plugins/producers.lua",

["kong.plugins.opentelemetry.migrations"] = "kong/plugins/opentelemetry/migrations/init.lua",
Expand Down
1 change: 1 addition & 0 deletions kong/enterprise_edition/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ return {
"ai-semantic-prompt-guard",
"ai-semantic-cache",
"json-threat-protection",
"confluent",
},

-- this list is joined into the DICTS list in kong.constants
Expand Down
2 changes: 2 additions & 0 deletions kong/enterprise_edition/kafka/plugins/producers.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ local function create(conf)
max_buffering = conf.producer_async_buffering_limits_messages_in_memory,

ssl = conf.security.ssl,

client_id = conf.client_id
}
local cluster_name = conf.cluster_name

Expand Down
114 changes: 114 additions & 0 deletions kong/plugins/confluent/handler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
-- This software is copyright Kong Inc. and its licensors.
-- Use of the software is subject to the agreement between your organization
-- and Kong Inc. If there is no such agreement, use is governed by and
-- subject to the terms of the Kong Master Software License Agreement found
-- at https://konghq.com/enterprisesoftwarelicense/.
-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ]

local kong = kong
local producers = require "kong.enterprise_edition.kafka.plugins.producers"
local meta = require "kong.meta"
local cjson_encode = require("cjson").encode

local ngx_encode_base64 = ngx.encode_base64

local CONFLUENT_CLIENT_ID = "cwc|001f100001XcA82AAF"

local ConfluentHandler = {}

ConfluentHandler.PRIORITY = 752
ConfluentHandler.VERSION = meta.core_version


local raw_content_types = {
["text/plain"] = true,
["text/html"] = true,
["application/xml"] = true,
["text/xml"] = true,
["application/soap+xml"] = true,
}


local function build_kafka_message_from_request(conf)
local method
if conf.forward_method then
method = kong.request.get_method()
end

local headers
if conf.forward_headers then
headers = kong.request.get_headers()
end

local uri, uri_args
if conf.forward_uri then
uri = kong.request.get_path_with_query()
uri_args = kong.request.get_query()
end

local body, body_args, body_base64
if conf.forward_body then
body = kong.request.get_raw_body()
local err
body_args, err = kong.request.get_body()
if err and err:match("content type") then
body_args = {}
local content_type = kong.request.get_header("content-type")
if not raw_content_types[content_type] then
-- don't know what this body MIME type is, base64 it just in case
body = ngx_encode_base64(body)
body_base64 = true
end
end
end

return cjson_encode({
method = method,
headers = headers,
uri = uri,
uri_args = uri_args,
body = body,
body_args = body_args,
body_base64 = body_base64,
})
end

function ConfluentHandler:access(conf)
local message, err = build_kafka_message_from_request(conf)
if not message then
return producers.handle_error({
status_code = 500,
internal_error = "could not build a Kafka message from request " .. err,
external_error = "could not build Kafka message"
})
end

-- Translate config to producer config as this plugin needs a simplified schema.
local config = {
topic = conf.topic,
bootstrap_servers = conf.bootstrap_servers,
timeout = conf.timeout,
keepalive = conf.keepalive,
keepalive_enabled = conf.keepalive_enabled,
authentication = {
mechanism = "PLAIN",
strategy = "sasl",
user = conf.cluster_api_key,
password = conf.cluster_api_secret,
-- confluent_cloud_api_key = conf.confluent_cloud_api_key,
-- confluent_cloud_api_secret = conf.confluent_cloud_api_secret,
},
security = {
ssl = true,
},
client_id = CONFLUENT_CLIENT_ID,
}
local ok, s_err = producers.send_message(config, message)
if not ok then
return producers.handle_error(s_err)
end

return kong.response.exit(200, { message = "message sent" })
end

return ConfluentHandler
89 changes: 89 additions & 0 deletions kong/plugins/confluent/schema.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
-- This software is copyright Kong Inc. and its licensors.
-- Use of the software is subject to the agreement between your organization
-- and Kong Inc. If there is no such agreement, use is governed by and
-- subject to the terms of the Kong Master Software License Agreement found
-- at https://konghq.com/enterprisesoftwarelicense/.
-- [ END OF LICENSE 0867164ffc95e54f04670b5169c09574bdbd9bba ]

local typedefs = require "kong.db.schema.typedefs"

return {
name = "confluent",
fields = {
{ protocols = typedefs.protocols_http },
{ consumer_group = typedefs.no_consumer_group },
{ config = {
type = "record",
fields = {
{ bootstrap_servers = { description = "Set of bootstrap brokers in a `{host: host, port: port}` list format.", type = "set",
elements = {
type = "record",
fields = {
{ host = typedefs.host({ required = true }), },
{ port = typedefs.port({ required = true }), },
},
},
},
},
{ topic = { description = "The Kafka topic to publish to.", type = "string", required = true }, },
{ timeout = { description = "Socket timeout in milliseconds.", type = "integer", default = 10000 }, },
{ keepalive = { description = "Keepalive timeout in milliseconds.", type = "integer", default = 60000 }, },
{ keepalive_enabled = { type = "boolean", default = false }, },
{ cluster_api_key = { description = "Username/Apikey for SASL authentication.",
type = "string",
required = true,
encrypted = true,
referenceable = true } },
{ cluster_api_secret = { description = "Password/ApiSecret for SASL authentication.",
type = "string",
required = true,
encrypted = true,
referenceable = true } },
{ confluent_cloud_api_key = { description = "Apikey for authentication with Confluent Cloud. This allows for management tasks such as creating topics, ACLs, etc.",
type = "string",
required = false,
encrypted = true,
referenceable = true, } },

{ confluent_cloud_api_secret = { description = "The corresponding secret for the Confluent Cloud API key.",
type = "string",
required = false,
encrypted = true,
referenceable = true, } },

{ forward_method = { description = "Include the request method in the message. At least one of these must be true: `forward_method`, `forward_uri`, `forward_headers`, `forward_body`.", type = "boolean", default = false } },
{ forward_uri = { description = "Include the request URI and URI arguments (as in, query arguments) in the message. At least one of these must be true: `forward_method`, `forward_uri`, `forward_headers`, `forward_body`.", type = "boolean", default = false } },
{ forward_headers = { description = "Include the request headers in the message. At least one of these must be true: `forward_method`, `forward_uri`, `forward_headers`, `forward_body`.", type = "boolean",default = false } },
{ forward_body = { description = "Include the request body in the message. At least one of these must be true: `forward_method`, `forward_uri`, `forward_headers`, `forward_body`.", type = "boolean", default = true } },

-- TODO change cluster_name to required in 3.0
{ cluster_name = { description = "An identifier for the Kafka cluster. By default, this field generates a random string. You can also set your own custom cluster identifier. If more than one Kafka plugin is configured without a `cluster_name` (that is, if the default autogenerated value is removed), these plugins will use the same producer, and by extension, the same cluster. Logs will be sent to the leader of the cluster.", type = "string", required = false, auto = true } },

{ producer_request_acks = { description = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments; 1 for only the leader; and -1 for the full ISR (In-Sync Replica set).", type = "integer", default = 1, one_of = { -1, 0, 1 }, }, },
{ producer_request_timeout = { description = "Time to wait for a Produce response in milliseconds.", type = "integer", default = 2000 }, },
{ producer_request_limits_messages_per_request = { description = "Maximum number of messages to include into a single producer request.", type = "integer", default = 200 }, },
{ producer_request_limits_bytes_per_request = { description = "Maximum size of a Produce request in bytes.", type = "integer", default = 1048576 }, },
{ producer_request_retries_max_attempts = { description = "Maximum number of retry attempts per single Produce request.", type = "integer", default = 10 }, },
{ producer_request_retries_backoff_timeout = { description = "Backoff interval between retry attempts in milliseconds.", type = "integer", default = 100 }, },
{ producer_async = { description = "Flag to enable asynchronous mode.", type = "boolean", default = true }, },
{ producer_async_flush_timeout = { description = "Maximum time interval in milliseconds between buffer flushes in asynchronous mode.", type = "integer", default = 1000 }, },
{ producer_async_buffering_limits_messages_in_memory = { description = "Maximum number of messages that can be buffered in memory in asynchronous mode.", type = "integer", default = 50000 }, },
},

entity_checks = {
{ custom_entity_check = {
field_sources = { "forward_method", "forward_uri", "forward_headers", "forward_body" },
fn = function(entity)
if entity.forward_method or entity.forward_uri
or entity.forward_headers or entity.forward_body then
return true
end
return nil, "at least one of these attributes must be true: forward_method, forward_uri, forward_headers, forward_body"
end
},
},
},
},
},
},
}
2 changes: 1 addition & 1 deletion plugins-ee/kafka-log/kong-plugin-kafka-log-dev-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = {
}
dependencies = {
"lua >= 5.1",
"kong-lua-resty-kafka == 0.19",
"kong-lua-resty-kafka == 0.20",
}
build = {
type = "builtin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = {
}
dependencies = {
"lua >= 5.1",
"kong-lua-resty-kafka == 0.19",
"kong-lua-resty-kafka == 0.20",
}
build = {
type = "builtin",
Expand Down
8 changes: 8 additions & 0 deletions scripts/check-version-compatibility
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ ENABLE_KAFKA_UPSTREAM=(
--data "config.authentication.mechanism=SCRAM-SHA-512"
)

ENABLE_CONFLUENT=(
--data "name=confluent"
--data "config.topic=kong"
--data "config.cluster_name=kong"
--data "config.authentication.cluster_api_key=abcdef12-3456-7890-abcd-ef1234567890"
--data "config.authentication.cluster_api_secret=abcdef12-3456-7890-abcd-ef1234567890"
)

ENABLE_KEY_AUTH_ENC=(
--data "name=key-auth-enc"
)
Expand Down
1 change: 1 addition & 0 deletions spec-ee/03-plugins/01-plugins_order_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ describe("Plugins", function()
'ai-response-transformer',
"ai-semantic-cache",
'standard-webhooks',
'confluent',
'kafka-upstream',
'aws-lambda',
'azure-functions',
Expand Down
Loading

0 comments on commit 45ee373

Please sign in to comment.