Skip to content

Commit

Permalink
Merge pull request #2 from lxnewayfarer/master
Browse files Browse the repository at this point in the history
Fix overwriting of low-level Kafka options with auth options
  • Loading branch information
bibendi authored Jan 27, 2025
2 parents 616cd8c + 6af9317 commit 9008ce3
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ auth:
sasl_mechanism: SCRAM-SHA-512
```

If you need to use another variant, use the low-level custom Kafka options `kafka_config:` of `config/kafka_producer.yml`. These options will overwrite the options in the auth section.

Example of SASL_SSL protocol auth via `kafka_config`:

```yaml
kafka_config:
security.protocol: SASL_SSL
sasl.username: user
sasl.password: pwd
ssl.ca.pem: ca_cert
sasl.mechanism: SCRAM-SHA-512
```
### `kafka` config section

The `servers` key is required and should be in rdkafka format: without `kafka://` prefix, for example: `srv1:port1,srv2:port2,...`.
Expand Down
4 changes: 2 additions & 2 deletions lib/sbmt/kafka_producer/config/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def instance
coerce_types auth: coerce_to(Auth)

def to_kafka_options
kafka.to_kafka_options
.merge(auth.to_kafka_options)
auth.to_kafka_options
.merge(kafka.to_kafka_options)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions spec/rails_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
ENGINE_ROOT = Pathname.new(File.expand_path("..", __dir__))

require "spec_helper"
require "logger"
require "combustion"

begin
Expand Down
34 changes: 34 additions & 0 deletions spec/sbmt/kafka_producer/config/producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,39 @@
expect(config.metrics_listener_class).to eq("::Sbmt::KafkaProducer::Instrumentation::YabedaMetricsListener")
end
end

context "when kafka_config options overwrite auth params" do
let(:ca_cert) { OpenSSL::PKey::RSA.new(2048).to_s }
let(:default_env) do
super().merge(
"KAFKA_PRODUCER_KAFKA__KAFKA_CONFIG__SECURITY.PROTOCOL" => "SASL_SSL",
"KAFKA_PRODUCER_KAFKA__KAFKA_CONFIG__SASL.MECHANISM" => "SCRAM-SHA-512",
"KAFKA_PRODUCER_KAFKA__KAFKA_CONFIG__SSL.CA.PEM" => ca_cert
)
end

it "properly merges kafka options uses auth params from low-level config" do
with_env(default_env) do
expect(config.to_kafka_options)
.to eq(kafka_config_defaults.merge(
"bootstrap.servers": "server1:9092,server2:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"ssl.ca.pem": ca_cert,
"sasl.password": "password",
"sasl.username": "username",
# loaded from kafka_producer.yml
"message.send.max.retries": 2,
"request.required.acks": -1,
"request.timeout.ms": 1000,
"retry.backoff.ms": 1000,
"socket.connection.setup.timeout.ms": 2000,
# arbitrary parameters for section kafka_config file kafka_producer.yml
"queue.buffering.max.messages": 1,
"queue.buffering.max.ms": 10000
))
end
end
end
end
end

0 comments on commit 9008ce3

Please sign in to comment.