Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2546/support-for-kafka-tags-in-ULS' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2546] feat: for synchronous messages we place logs in tags

Closes DEX-2546

See merge request nstmrt/rubygems/sbmt-kafka_producer!36
  • Loading branch information
Arlantir committed Sep 18, 2024
2 parents 693123f + 0c72512 commit d416096
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 20 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [3.1.0] - 2024-09-13

### Added

- For synchronous messages and errors, we place logs in tags

### Fixed

- Fixed mock for tests

## [3.0.0] - 2024-08-27

## BREAKING
Expand Down
50 changes: 44 additions & 6 deletions lib/sbmt/kafka_producer/base_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ module KafkaProducer
class BaseProducer
extend Dry::Initializer

MSG_SUCCESS = "Message has been successfully sent to Kafka"

option :client, default: -> { KafkaClientFactory.default_client }
option :topic

def sync_publish!(payload, options = {})
report = around_publish do
client.produce_sync(payload: payload, **options.merge(topic: topic))
report, produce_duration = around_publish do
measure_time do
client.produce_sync(payload: payload, **options.merge(topic: topic))
end
end
log_success(report)
log_success(report, produce_duration)
true
end

Expand Down Expand Up @@ -78,12 +82,19 @@ def ignore_kafka_errors?
def log_error(error)
return true if ignore_kafka_errors?

logger.error "KAFKA ERROR: #{format_exception_error(error)}\n#{error.backtrace.join("\n")}"
log_tags = {stacktrace: error.backtrace.join("\n")}

logger.tagged(log_tags) do
logger.send(:error, "KAFKA ERROR: #{format_exception_error(error)}")
end

ErrorTracker.error(error)
end

def log_success(report)
logger.info "Message has been successfully sent to Kafka - topic: #{report.topic_name}, partition: #{report.partition}, offset: #{report.offset}"
def log_success(report, produce_duration)
log_tags = {kafka: log_tags(report, produce_duration)}

log_with_tags(log_tags)
end

def format_exception_error(error)
Expand All @@ -100,6 +111,33 @@ def with_cause?(error)
error.respond_to?(:cause) && error.cause.present?
end

def log_tags(report, produce_duration)
{
topic: report.topic_name,
partition: report.partition,
offset: report.offset,
produce_duration_ms: produce_duration
}
end

def log_with_tags(log_tags)
return unless logger.respond_to?(:tagged)

logger.tagged(log_tags) do
logger.send(:info, MSG_SUCCESS)
end
end

def measure_time
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = yield
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

elapsed_time = end_time - start_time

[result, elapsed_time]
end

def config
Config::Producer
end
Expand Down
4 changes: 4 additions & 0 deletions lib/sbmt/kafka_producer/logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class Logger
def add(...)
logger.add(...)
end

def tagged(...)
logger.tagged(...)
end
end
end
end
4 changes: 3 additions & 1 deletion lib/sbmt/kafka_producer/testing/configure_producer_client.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# frozen_string_literal: true

class FakeWaterDropClient
Report = Struct.new(:topic_name, :partition, :offset)

def produce_sync(*)
# no op
Report.new("fake_topic", 0, 0)
end

def produce_async(*)
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_producer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaProducer
VERSION = "3.0.0"
VERSION = "3.1.0"
end
end
53 changes: 41 additions & 12 deletions spec/sbmt/kafka_producer/base_producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,42 @@ def initialize(message, cause)
label: nil,
wait: delivery_report)
end
let(:logger) { ActiveSupport::TaggedLogging.new(Logger.new($stdout)) }
let(:options) { {seed_brokers: "kafka://kafka:9092"} }

before do
allow(Sbmt::KafkaProducer::KafkaClientFactory).to receive(:default_client).and_return(client)
allow(Sbmt::KafkaProducer).to receive(:logger).and_return(logger)
end

describe "#sync_publish" do
let(:options) { {seed_brokers: "kafka://kafka:9092"} }

context "when payload is successfully delivered" do
before do
allow(client).to receive(:produce_sync).with(
payload: payload,
topic: "test_topic",
seed_brokers: "kafka://kafka:9092"
).and_return(delivery_report)
).and_return(delivery_report, 0.1)
end

it "produces the payload via the client and returns true" do
expect(producer.sync_publish(payload, options)).to be(true)
end

it "logs the success message with correct tags" do
expect(logger).to receive(:tagged).with(hash_including(
kafka: hash_including(
topic: "my_topic",
partition: 0,
offset: 0,
produce_duration_ms: kind_of(Numeric)
)
)).and_yield

expect(logger).to receive(:info).with("Message has been successfully sent to Kafka")

producer.sync_publish(payload, options)
end
end

context "when delivery fails with Kafka::DeliveryFailed" do
Expand All @@ -71,28 +87,43 @@ def initialize(message, cause)
end

it "raises an error" do
expect(Sbmt::KafkaProducer.logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/)
expect(logger).to receive(:tagged).with(include(:stacktrace)).and_yield
expect(logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/)

expect(producer.sync_publish(payload, options)).to be(false)
end
end
end
end

describe "#sync_publish!" do
let(:options) { {seed_brokers: "kafka://kafka:9092"} }

context "when payload is successfully delivered" do
before do
allow(client).to receive(:produce_sync).with(
payload: payload,
topic: "test_topic",
seed_brokers: "kafka://kafka:9092"
).and_return(delivery_report)
).and_return(delivery_report, 0.2)
end

it "produces the payload via the client and returns true" do
expect(producer.sync_publish!(payload, options)).to be(true)
end

it "logs the success message with correct tags" do
expect(logger).to receive(:tagged).with(hash_including(
kafka: hash_including(
topic: "my_topic",
partition: 0,
offset: 0,
produce_duration_ms: kind_of(Numeric)
)
)).and_yield

expect(logger).to receive(:info).with("Message has been successfully sent to Kafka")

producer.sync_publish!(payload, options)
end
end

context "when delivery fails with Kafka::DeliveryFailed" do
Expand All @@ -107,8 +138,6 @@ def initialize(message, cause)
end

describe "#async_publish" do
let(:options) { {seed_brokers: "kafka://kafka:9092"} }

context "when payload is successfully delivered" do
before do
allow(client).to receive(:produce_async).with(
Expand Down Expand Up @@ -144,16 +173,16 @@ def initialize(message, cause)
end

it "raises an error" do
expect(Sbmt::KafkaProducer.logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/)
expect(logger).to receive(:tagged).with(include(:stacktrace)).and_yield
expect(logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/)

expect(producer.async_publish(payload, options)).to be(false)
end
end
end
end

describe "#async_publish!" do
let(:options) { {seed_brokers: "kafka://kafka:9092"} }

context "when payload is successfully delivered" do
before do
allow(client).to receive(:produce_async).with(
Expand Down

0 comments on commit d416096

Please sign in to comment.