From 0c725121a30640b61f617d56c9b547acfe47fc03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A2=D0=B0=D1=80=D0=B0=D1=81=D0=B5=D0=BD=D0=BA=D0=BE=20?= =?UTF-8?q?=D0=94=D0=B5=D0=BD=D0=B8=D1=81=20=D0=90=D0=BD=D0=B0=D1=82=D0=BE?= =?UTF-8?q?=D0=BB=D1=8C=D0=B5=D0=B2=D0=B8=D1=87?= Date: Wed, 18 Sep 2024 08:31:59 +0000 Subject: [PATCH] [DEX-2546] feat: for synchronous messages we place logs in tags --- CHANGELOG.md | 10 ++++ lib/sbmt/kafka_producer/base_producer.rb | 50 ++++++++++++++--- lib/sbmt/kafka_producer/logger.rb | 4 ++ .../testing/configure_producer_client.rb | 4 +- lib/sbmt/kafka_producer/version.rb | 2 +- .../sbmt/kafka_producer/base_producer_spec.rb | 53 ++++++++++++++----- 6 files changed, 103 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d74825..e725b29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [3.1.0] - 2024-09-13 + +### Added + +- For synchronous messages and errors, we place logs in tags + +### Fixed + +- Fixed mock for tests + ## [3.0.0] - 2024-08-27 ## BREAKING diff --git a/lib/sbmt/kafka_producer/base_producer.rb b/lib/sbmt/kafka_producer/base_producer.rb index 10eccd1..a994292 100644 --- a/lib/sbmt/kafka_producer/base_producer.rb +++ b/lib/sbmt/kafka_producer/base_producer.rb @@ -5,14 +5,18 @@ module KafkaProducer class BaseProducer extend Dry::Initializer + MSG_SUCCESS = "Message has been successfully sent to Kafka" + option :client, default: -> { KafkaClientFactory.default_client } option :topic def sync_publish!(payload, options = {}) - report = around_publish do - client.produce_sync(payload: payload, **options.merge(topic: topic)) + report, produce_duration = around_publish do + measure_time do + client.produce_sync(payload: payload, **options.merge(topic: topic)) + end end - log_success(report) + log_success(report, produce_duration) true end @@ -78,12 +82,19 @@ def ignore_kafka_errors? def log_error(error) return true if ignore_kafka_errors? - logger.error "KAFKA ERROR: #{format_exception_error(error)}\n#{error.backtrace.join("\n")}" + log_tags = {stacktrace: error.backtrace.join("\n")} + + logger.tagged(log_tags) do + logger.send(:error, "KAFKA ERROR: #{format_exception_error(error)}") + end + ErrorTracker.error(error) end - def log_success(report) - logger.info "Message has been successfully sent to Kafka - topic: #{report.topic_name}, partition: #{report.partition}, offset: #{report.offset}" + def log_success(report, produce_duration) + log_tags = {kafka: log_tags(report, produce_duration)} + + log_with_tags(log_tags) end def format_exception_error(error) @@ -100,6 +111,33 @@ def with_cause?(error) error.respond_to?(:cause) && error.cause.present? end + def log_tags(report, produce_duration) + { + topic: report.topic_name, + partition: report.partition, + offset: report.offset, + produce_duration_ms: produce_duration + } + end + + def log_with_tags(log_tags) + return unless logger.respond_to?(:tagged) + + logger.tagged(log_tags) do + logger.send(:info, MSG_SUCCESS) + end + end + + def measure_time + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + result = yield + end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + + elapsed_time = end_time - start_time + + [result, elapsed_time] + end + def config Config::Producer end diff --git a/lib/sbmt/kafka_producer/logger.rb b/lib/sbmt/kafka_producer/logger.rb index 4513680..279319f 100644 --- a/lib/sbmt/kafka_producer/logger.rb +++ b/lib/sbmt/kafka_producer/logger.rb @@ -20,6 +20,10 @@ class Logger def add(...) logger.add(...) end + + def tagged(...) + logger.tagged(...) + end end end end diff --git a/lib/sbmt/kafka_producer/testing/configure_producer_client.rb b/lib/sbmt/kafka_producer/testing/configure_producer_client.rb index 12e7a6f..d2a266e 100644 --- a/lib/sbmt/kafka_producer/testing/configure_producer_client.rb +++ b/lib/sbmt/kafka_producer/testing/configure_producer_client.rb @@ -1,8 +1,10 @@ # frozen_string_literal: true class FakeWaterDropClient + Report = Struct.new(:topic_name, :partition, :offset) + def produce_sync(*) - # no op + Report.new("fake_topic", 0, 0) end def produce_async(*) diff --git a/lib/sbmt/kafka_producer/version.rb b/lib/sbmt/kafka_producer/version.rb index dd7c233..8eab7f3 100644 --- a/lib/sbmt/kafka_producer/version.rb +++ b/lib/sbmt/kafka_producer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaProducer - VERSION = "3.0.0" + VERSION = "3.1.0" end end diff --git a/spec/sbmt/kafka_producer/base_producer_spec.rb b/spec/sbmt/kafka_producer/base_producer_spec.rb index 308bb36..87f7c4f 100644 --- a/spec/sbmt/kafka_producer/base_producer_spec.rb +++ b/spec/sbmt/kafka_producer/base_producer_spec.rb @@ -28,26 +28,42 @@ def initialize(message, cause) label: nil, wait: delivery_report) end + let(:logger) { ActiveSupport::TaggedLogging.new(Logger.new($stdout)) } + let(:options) { {seed_brokers: "kafka://kafka:9092"} } before do allow(Sbmt::KafkaProducer::KafkaClientFactory).to receive(:default_client).and_return(client) + allow(Sbmt::KafkaProducer).to receive(:logger).and_return(logger) end describe "#sync_publish" do - let(:options) { {seed_brokers: "kafka://kafka:9092"} } - context "when payload is successfully delivered" do before do allow(client).to receive(:produce_sync).with( payload: payload, topic: "test_topic", seed_brokers: "kafka://kafka:9092" - ).and_return(delivery_report) + ).and_return(delivery_report, 0.1) end it "produces the payload via the client and returns true" do expect(producer.sync_publish(payload, options)).to be(true) end + + it "logs the success message with correct tags" do + expect(logger).to receive(:tagged).with(hash_including( + kafka: hash_including( + topic: "my_topic", + partition: 0, + offset: 0, + produce_duration_ms: kind_of(Numeric) + ) + )).and_yield + + expect(logger).to receive(:info).with("Message has been successfully sent to Kafka") + + producer.sync_publish(payload, options) + end end context "when delivery fails with Kafka::DeliveryFailed" do @@ -71,7 +87,9 @@ def initialize(message, cause) end it "raises an error" do - expect(Sbmt::KafkaProducer.logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/) + expect(logger).to receive(:tagged).with(include(:stacktrace)).and_yield + expect(logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/) + expect(producer.sync_publish(payload, options)).to be(false) end end @@ -79,20 +97,33 @@ def initialize(message, cause) end describe "#sync_publish!" do - let(:options) { {seed_brokers: "kafka://kafka:9092"} } - context "when payload is successfully delivered" do before do allow(client).to receive(:produce_sync).with( payload: payload, topic: "test_topic", seed_brokers: "kafka://kafka:9092" - ).and_return(delivery_report) + ).and_return(delivery_report, 0.2) end it "produces the payload via the client and returns true" do expect(producer.sync_publish!(payload, options)).to be(true) end + + it "logs the success message with correct tags" do + expect(logger).to receive(:tagged).with(hash_including( + kafka: hash_including( + topic: "my_topic", + partition: 0, + offset: 0, + produce_duration_ms: kind_of(Numeric) + ) + )).and_yield + + expect(logger).to receive(:info).with("Message has been successfully sent to Kafka") + + producer.sync_publish!(payload, options) + end end context "when delivery fails with Kafka::DeliveryFailed" do @@ -107,8 +138,6 @@ def initialize(message, cause) end describe "#async_publish" do - let(:options) { {seed_brokers: "kafka://kafka:9092"} } - context "when payload is successfully delivered" do before do allow(client).to receive(:produce_async).with( @@ -144,7 +173,9 @@ def initialize(message, cause) end it "raises an error" do - expect(Sbmt::KafkaProducer.logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/) + expect(logger).to receive(:tagged).with(include(:stacktrace)).and_yield + expect(logger).to receive(:error).with(/KAFKA ERROR: StandardError Second Exception. TestWrapError First Exception/) + expect(producer.async_publish(payload, options)).to be(false) end end @@ -152,8 +183,6 @@ def initialize(message, cause) end describe "#async_publish!" do - let(:options) { {seed_brokers: "kafka://kafka:9092"} } - context "when payload is successfully delivered" do before do allow(client).to receive(:produce_async).with(