diff --git a/README.md b/README.md index 449031a..b6cfeaa 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,18 @@ auth: sasl_mechanism: SCRAM-SHA-512 ``` +If you need to use another variant, use the low-level custom Kafka options `kafka_config:` of `config/kafka_producer.yml`. These options will overwrite the options in the auth section. + +Example of SASL_SSL protocol auth via `kafka_config`: + +```yaml +kafka_config: + security.protocol: SASL_SSL + sasl.username: user + sasl.password: pwd + ssl.ca.pem: ca_cert + sasl.mechanism: SCRAM-SHA-512 +``` ### `kafka` config section The `servers` key is required and should be in rdkafka format: without `kafka://` prefix, for example: `srv1:port1,srv2:port2,...`. diff --git a/lib/sbmt/kafka_producer/config/producer.rb b/lib/sbmt/kafka_producer/config/producer.rb index 4b955a1..ad80ded 100644 --- a/lib/sbmt/kafka_producer/config/producer.rb +++ b/lib/sbmt/kafka_producer/config/producer.rb @@ -41,8 +41,8 @@ def instance coerce_types auth: coerce_to(Auth) def to_kafka_options - kafka.to_kafka_options - .merge(auth.to_kafka_options) + auth.to_kafka_options + .merge(kafka.to_kafka_options) end end end diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 0f52e59..6c75601 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -6,6 +6,7 @@ ENGINE_ROOT = Pathname.new(File.expand_path("..", __dir__)) require "spec_helper" +require "logger" require "combustion" begin diff --git a/spec/sbmt/kafka_producer/config/producer_spec.rb b/spec/sbmt/kafka_producer/config/producer_spec.rb index 23b38f7..e2acc48 100644 --- a/spec/sbmt/kafka_producer/config/producer_spec.rb +++ b/spec/sbmt/kafka_producer/config/producer_spec.rb @@ -52,5 +52,39 @@ expect(config.metrics_listener_class).to eq("::Sbmt::KafkaProducer::Instrumentation::YabedaMetricsListener") end end + + context "when kafka_config options overwrite auth params" do + let(:ca_cert) { OpenSSL::PKey::RSA.new(2048).to_s } + let(:default_env) do + super().merge( + "KAFKA_PRODUCER_KAFKA__KAFKA_CONFIG__SECURITY.PROTOCOL" => "SASL_SSL", + "KAFKA_PRODUCER_KAFKA__KAFKA_CONFIG__SASL.MECHANISM" => "SCRAM-SHA-512", + "KAFKA_PRODUCER_KAFKA__KAFKA_CONFIG__SSL.CA.PEM" => ca_cert + ) + end + + it "properly merges kafka options uses auth params from low-level config" do + with_env(default_env) do + expect(config.to_kafka_options) + .to eq(kafka_config_defaults.merge( + "bootstrap.servers": "server1:9092,server2:9092", + "security.protocol": "SASL_SSL", + "sasl.mechanism": "SCRAM-SHA-512", + "ssl.ca.pem": ca_cert, + "sasl.password": "password", + "sasl.username": "username", + # loaded from kafka_producer.yml + "message.send.max.retries": 2, + "request.required.acks": -1, + "request.timeout.ms": 1000, + "retry.backoff.ms": 1000, + "socket.connection.setup.timeout.ms": 2000, + # arbitrary parameters for section kafka_config file kafka_producer.yml + "queue.buffering.max.messages": 1, + "queue.buffering.max.ms": 10000 + )) + end + end + end end end