From b4dd83d3ce511f8d449710c4d42e87534e0cc754 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Fri, 8 Dec 2023 18:30:12 +0100 Subject: [PATCH 01/11] Implement simple debug backend endpoint for events This currently is missing a queue and calls the backend directly from the agent. Should I implement an events_worker within this PR or in the PR that adds the server backend? --- lib/honeybadger/agent.rb | 3 +-- lib/honeybadger/backend/base.rb | 11 +++++++++++ lib/honeybadger/backend/debug.rb | 7 +++++++ lib/honeybadger/backend/null.rb | 4 ++++ spec/unit/honeybadger/agent_spec.rb | 4 ++-- spec/unit/honeybadger/backend/debug_spec.rb | 12 ++++++++++++ 6 files changed, 37 insertions(+), 4 deletions(-) diff --git a/lib/honeybadger/agent.rb b/lib/honeybadger/agent.rb index d4d51c411..a23180d38 100644 --- a/lib/honeybadger/agent.rb +++ b/lib/honeybadger/agent.rb @@ -375,8 +375,7 @@ def event(event_type, payload={}) logger.error("Event has non-hash payload") return end - log_string = payload.merge({event_type: event_type, ts: ts}).to_json - logger.debug(log_string) + backend.event(event_type, ts, payload) end # @api private diff --git a/lib/honeybadger/backend/base.rb b/lib/honeybadger/backend/base.rb index 34465c005..f0247a576 100644 --- a/lib/honeybadger/backend/base.rb +++ b/lib/honeybadger/backend/base.rb @@ -109,6 +109,17 @@ def track_deployment(payload) notify(:deploys, payload) end + # Send event + # @example + # backend.event("email_received", "2023-03-04T12:12:00+1:00", { subject: 'Re: Aquisition' }) + # + # @param [String] event_type Type of event to send + # @param [String] timestamp RFC3339 conforming formatted timestamp + # @param [Hash] payload Generic payload + def event(event_type, timestamp, payload = {}) + raise NotImplementedError, "must define #event on subclass" + end + private attr_reader :config diff --git a/lib/honeybadger/backend/debug.rb b/lib/honeybadger/backend/debug.rb index 7a401dd8f..1f7d371b5 100644 --- a/lib/honeybadger/backend/debug.rb +++ b/lib/honeybadger/backend/debug.rb @@ -17,6 +17,13 @@ def check_in(id) return Response.new(ENV['DEBUG_BACKEND_STATUS'].to_i, nil) if ENV['DEBUG_BACKEND_STATUS'] super end + + def event(event_type, timestamp, payload) + event_hash = payload.merge({event_type: event_type, ts: timestamp}) + logger.unknown("sending event to debug backend with event=#{event_hash.to_json}") + return Response.new(ENV['DEBUG_BACKEND_STATUS'].to_i, nil) if ENV['DEBUG_BACKEND_STATUS'] + super + end end end end diff --git a/lib/honeybadger/backend/null.rb b/lib/honeybadger/backend/null.rb index 17c259c72..1fae58004 100644 --- a/lib/honeybadger/backend/null.rb +++ b/lib/honeybadger/backend/null.rb @@ -24,6 +24,10 @@ def notify(feature, payload) def check_in(id) StubbedResponse.new end + + def event(event_type, timestamp, payload = {}) + StubbedResponse.new + end end end end diff --git a/spec/unit/honeybadger/agent_spec.rb b/spec/unit/honeybadger/agent_spec.rb index 6ad47138e..9a2758341 100644 --- a/spec/unit/honeybadger/agent_spec.rb +++ b/spec/unit/honeybadger/agent_spec.rb @@ -285,14 +285,14 @@ context "#event" do let(:logger) { double(NULL_LOGGER) } - let(:config) { Honeybadger::Config.new(api_key:'fake api key', logger: logger, debug: true) } + let(:config) { Honeybadger::Config.new(api_key:'fake api key', logger: logger, backend: :debug) } let(:instance) { Honeybadger::Agent.new(config) } subject { instance } it "logs an event" do expect(logger).to receive(:add) do |level, msg| - expect(level).to eq(Logger::Severity::INFO) + expect(level).to eq(Logger::Severity::UNKNOWN) expect(msg).to match(/"some_data":"is here"/) expect(msg).to match(/"event_type":"test_event"/) expect(msg).to match(/"ts":/) diff --git a/spec/unit/honeybadger/backend/debug_spec.rb b/spec/unit/honeybadger/backend/debug_spec.rb index 949b4e5ad..f96ff96cc 100644 --- a/spec/unit/honeybadger/backend/debug_spec.rb +++ b/spec/unit/honeybadger/backend/debug_spec.rb @@ -36,4 +36,16 @@ instance.check_in(10) end end + + describe "#event" do + it "logs the event" do + expect(logger).to receive(:unknown) do |msg| + expect(msg).to match(/"some_data":"is here"/) + expect(msg).to match(/"event_type":"test_event"/) + expect(msg).to match(/"ts":"test_timestamp"/) + end + + instance.event("test_event", "test_timestamp", {some_data: "is here"}) + end + end end From eada650d311688a32d950d68f5e8498800449e46 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Mon, 11 Dec 2023 20:26:11 +0100 Subject: [PATCH 02/11] Refactor signature of events backend to take only one argument --- lib/honeybadger/agent.rb | 2 +- lib/honeybadger/backend/base.rb | 6 ++---- lib/honeybadger/backend/debug.rb | 5 ++--- lib/honeybadger/backend/null.rb | 2 +- spec/unit/honeybadger/backend/base_spec.rb | 1 + spec/unit/honeybadger/backend/debug_spec.rb | 2 +- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/lib/honeybadger/agent.rb b/lib/honeybadger/agent.rb index a23180d38..2793be871 100644 --- a/lib/honeybadger/agent.rb +++ b/lib/honeybadger/agent.rb @@ -375,7 +375,7 @@ def event(event_type, payload={}) logger.error("Event has non-hash payload") return end - backend.event(event_type, ts, payload) + backend.event({event_type: event_type, ts: ts}.merge(payload)) end # @api private diff --git a/lib/honeybadger/backend/base.rb b/lib/honeybadger/backend/base.rb index f0247a576..07e87a017 100644 --- a/lib/honeybadger/backend/base.rb +++ b/lib/honeybadger/backend/base.rb @@ -113,10 +113,8 @@ def track_deployment(payload) # @example # backend.event("email_received", "2023-03-04T12:12:00+1:00", { subject: 'Re: Aquisition' }) # - # @param [String] event_type Type of event to send - # @param [String] timestamp RFC3339 conforming formatted timestamp - # @param [Hash] payload Generic payload - def event(event_type, timestamp, payload = {}) + # @param [Hash] payload event payload + def event(payload) raise NotImplementedError, "must define #event on subclass" end diff --git a/lib/honeybadger/backend/debug.rb b/lib/honeybadger/backend/debug.rb index 1f7d371b5..4d557b40a 100644 --- a/lib/honeybadger/backend/debug.rb +++ b/lib/honeybadger/backend/debug.rb @@ -18,9 +18,8 @@ def check_in(id) super end - def event(event_type, timestamp, payload) - event_hash = payload.merge({event_type: event_type, ts: timestamp}) - logger.unknown("sending event to debug backend with event=#{event_hash.to_json}") + def event(payload) + logger.unknown("sending event to debug backend with event=#{payload.to_json}") return Response.new(ENV['DEBUG_BACKEND_STATUS'].to_i, nil) if ENV['DEBUG_BACKEND_STATUS'] super end diff --git a/lib/honeybadger/backend/null.rb b/lib/honeybadger/backend/null.rb index 1fae58004..3f2af2d8c 100644 --- a/lib/honeybadger/backend/null.rb +++ b/lib/honeybadger/backend/null.rb @@ -25,7 +25,7 @@ def check_in(id) StubbedResponse.new end - def event(event_type, timestamp, payload = {}) + def event(payload) StubbedResponse.new end end diff --git a/spec/unit/honeybadger/backend/base_spec.rb b/spec/unit/honeybadger/backend/base_spec.rb index f1d671c8e..582bd9ef3 100644 --- a/spec/unit/honeybadger/backend/base_spec.rb +++ b/spec/unit/honeybadger/backend/base_spec.rb @@ -43,6 +43,7 @@ subject { described_class.new(config) } it { should respond_to :notify } + it { should respond_to :event } describe "#notify" do it "raises NotImplementedError" do diff --git a/spec/unit/honeybadger/backend/debug_spec.rb b/spec/unit/honeybadger/backend/debug_spec.rb index f96ff96cc..675649303 100644 --- a/spec/unit/honeybadger/backend/debug_spec.rb +++ b/spec/unit/honeybadger/backend/debug_spec.rb @@ -45,7 +45,7 @@ expect(msg).to match(/"ts":"test_timestamp"/) end - instance.event("test_event", "test_timestamp", {some_data: "is here"}) + instance.event({event_type: "test_event", ts: "test_timestamp", some_data: "is here"}) end end end From 09a3b6931c8b5436e8b962c70ccf62380388aef0 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Thu, 14 Dec 2023 19:47:37 +0100 Subject: [PATCH 03/11] WIP: Add worker --- lib/honeybadger/agent.rb | 10 +- lib/honeybadger/events_worker.rb | 289 +++++++++++++++++++++++++++++++ 2 files changed, 295 insertions(+), 4 deletions(-) create mode 100644 lib/honeybadger/events_worker.rb diff --git a/lib/honeybadger/agent.rb b/lib/honeybadger/agent.rb index 2793be871..e5f5ba3d8 100644 --- a/lib/honeybadger/agent.rb +++ b/lib/honeybadger/agent.rb @@ -7,6 +7,7 @@ require 'honeybadger/plugin' require 'honeybadger/logging' require 'honeybadger/worker' +require 'honeybadger/events_worker' require 'honeybadger/breadcrumbs' module Honeybadger @@ -74,7 +75,7 @@ def initialize(opts = {}) @breadcrumbs = nil end - init_worker + init_workers end # Sends an exception to Honeybadger. Does not report ignored exceptions by @@ -375,7 +376,7 @@ def event(event_type, payload={}) logger.error("Event has non-hash payload") return end - backend.event({event_type: event_type, ts: ts}.merge(payload)) + events_worker.push({event_type: event_type, ts: ts}.merge(payload)) end # @api private @@ -450,7 +451,7 @@ def with_rack_env(rack_env, &block) end # @api private - attr_reader :worker + attr_reader :worker, :events_worker # @api private # @!method init!(...) @@ -487,8 +488,9 @@ def send_now(object) true end - def init_worker + def init_workers @worker = Worker.new(config) + @events_worker = EventsWorker.new(config) end def with_error_handling diff --git a/lib/honeybadger/events_worker.rb b/lib/honeybadger/events_worker.rb new file mode 100644 index 000000000..7d63bd706 --- /dev/null +++ b/lib/honeybadger/events_worker.rb @@ -0,0 +1,289 @@ +require 'forwardable' +require 'net/http' + +require 'honeybadger/logging' + +module Honeybadger + # A concurrent queue to notify the backend. + # @api private + class EventsWorker + extend Forwardable + + include Honeybadger::Logging::Helper + + # Sub-class thread so we have a named thread (useful for debugging in Thread.list). + class Thread < ::Thread; end + + # Used to signal the worker to shutdown. + SHUTDOWN = :__hb_worker_shutdown! + + # The base number for the exponential backoff formula when calculating the + # throttle interval. `1.05 ** throttle` will reach an interval of 2 minutes + # after around 100 429 responses from the server. + BASE_THROTTLE = 1.05 + + SEND_TIMEOUT = 30 + MAX_EVENTS = 200 + MAX_EVENTS_SIZE = 400_000 + + + def initialize(config) + @config = config + @throttle = 0 + @throttle_interval = 0 + @mutex = Mutex.new + @marker = ConditionVariable.new + @queue = Queue.new + @send_queue = Queue.new + @shutdown = false + @start_at = nil + @pid = Process.pid + end + + def push(msg) + return false unless start + + if queue.size >= config.max_queue_size + warn { sprintf('Unable to report error; reached max queue size of %s. id=%s', queue.size, msg.id) } + return false + end + + queue.push(msg) + end + + def send_now(msg) + handle_response(msg, send_to_backend(msg)) + end + + def shutdown(force = false) + d { 'shutting down worker' } + + mutex.synchronize do + @shutdown = true + end + + return true if force + return true unless thread&.alive? + + if throttled? + warn { sprintf('Unable to report %s error(s) to Honeybadger (currently throttled)', queue.size) } unless queue.empty? + return true + end + + info { sprintf('Waiting to report %s error(s) to Honeybadger', queue.size) } unless queue.empty? + + queue.push(SHUTDOWN) + !!thread.join + ensure + queue.clear + kill! + end + + # Blocks until queue is processed up to this point in time. + def flush + mutex.synchronize do + if thread && thread.alive? + queue.push(marker) + marker.wait(mutex) + end + end + end + + def start + return false unless can_start? + + mutex.synchronize do + @shutdown = false + @start_at = nil + + return true if thread&.alive? + + @pid = Process.pid + @thread = Thread.new { run } + end + + true + end + + private + + attr_reader :config, :queue, :pid, :mutex, :marker, :thread, :throttle, + :throttle_interval, :start_at + + def_delegator :config, :backend + + def shutdown? + mutex.synchronize { @shutdown } + end + + def suspended? + mutex.synchronize { start_at && Time.now.to_i < start_at } + end + + def can_start? + return false if shutdown? + return false if suspended? + true + end + + def throttled? + mutex.synchronize { throttle > 0 } + end + + def kill! + d { 'killing worker thread' } + + if thread + Thread.kill(thread) + thread.join # Allow ensure blocks to execute. + end + + true + end + + def suspend(interval) + mutex.synchronize do + @start_at = Time.now.to_i + interval + queue.clear + end + + # Must be performed last since this may kill the current thread. + kill! + end + + def run + begin + d { 'worker started' } + Thread.current.thread_variable_set(:last_sent, Time.now) + Thread.current.thread_variable_set(:send_queue, Queue.new) + Thread.current.thread_variable_set(:send_queue_byte_size, 0) + + loop do + case msg = queue.pop + when SHUTDOWN then break + when ConditionVariable then signal_marker(msg) + else work(msg) + end + end + ensure + d { 'stopping worker' } + end + rescue Exception => e + error { + msg = "Error in worker thread (shutting down) class=%s message=%s\n\t%s" + sprintf(msg, e.class, e.message.dump, Array(e.backtrace).join("\n\t")) + } + ensure + release_marker + end + + def enqueue_msg(msg) + queue = Thread.current.thread_variable_get(:send_queue) + queue_byte_size = Thread.current.thread_variable_get(:send_queue_byte_size) + size = msg.to_json.bytesize + 1 + Thread.current.thread_variable_set(:send_queue_byte_size, queue_byte_size + size) + queue << msg + end + + def check_and_send + queue = Thread.current.thread_variable_get(:send_queue) + return if queue.empty? + last_sent = Thread.current.thread_variable_get(:last_sent) + queue_byte_size = Thread.current.thread_variable_get(:send_queue_byte_size) + + if queue.size >= MAX_EVENTS || (Time.now.to_i - last_sent.to_i) >= SEND_TIMEOUT || queue_byte_size >= MAX_EVENTS_SIZE + to_send = [] + while !queue.empty? + to_send << queue.pop + end + send_now(to_send) + end + end + + def work(msg) + enqueue_msg(msg) + check_and_send + + if shutdown? && throttled? + warn { sprintf('Unable to report %s error(s) to Honeybadger (currently throttled)', queue.size) } if queue.size > 1 + kill! + return + end + + sleep(throttle_interval) + rescue StandardError => e + error { + msg = "Error in worker thread class=%s message=%s\n\t%s" + sprintf(msg, e.class, e.message.dump, Array(e.backtrace).join("\n\t")) + } + end + + def send_to_backend(msg) + d { 'events_worker sending to backend' } + events_backend.send_event(msg) + end + + def calc_throttle_interval + ((BASE_THROTTLE ** throttle) - 1).round(3) + end + + def inc_throttle + mutex.synchronize do + @throttle += 1 + @throttle_interval = calc_throttle_interval + throttle + end + end + + def dec_throttle + mutex.synchronize do + return nil if throttle == 0 + @throttle -= 1 + @throttle_interval = calc_throttle_interval + throttle + end + end + + def handle_response(msg, response) + d { sprintf('events_worker response code=%s message=%s', response.code, response.message.to_s.dump) } + + case response.code + when 429, 503 + throttle = inc_throttle + warn { sprintf('Event send failed: project is sending too many errors. id=%s code=%s throttle=%s interval=%s', msg.id, response.code, throttle, throttle_interval) } + when 402 + warn { sprintf('Event send failed: payment is required. id=%s code=%s', msg.id, response.code) } + suspend(3600) + when 403 + warn { sprintf('Event send failed: API key is invalid. id=%s code=%s', msg.id, response.code) } + suspend(3600) + when 413 + warn { sprintf('Event send failed: Payload is too large. id=%s code=%s', msg.id, response.code) } + when 201 + if throttle = dec_throttle + info { sprintf('Success ⚡ Event sent code=%s throttle=%s interval=%s', response.code, throttle, throttle_interval) } + else + info { sprintf('Success ⚡ Event sent code=%s', response.code) } + end + when :stubbed + info { sprintf('Success ⚡ Development mode is enabled; This event will be sent after app is deployed.') } + when :error + warn { sprintf('Event send failed: an unknown error occurred. code=%s error=%s', response.code, response.message.to_s.dump) } + else + warn { sprintf('Event send failed: unknown response from server. code=%s', response.code) } + end + end + + # Release the marker. Important to perform during cleanup when shutting + # down, otherwise it could end up waiting indefinitely. + def release_marker + signal_marker(marker) + end + + def signal_marker(marker) + mutex.synchronize do + marker.signal + end + end + end +end From 13e4a1eedad5bb5b2c8714cf220f2495c9d01f8f Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Fri, 15 Dec 2023 10:16:05 +0100 Subject: [PATCH 04/11] WIP start of worker spec --- lib/honeybadger/events_worker.rb | 20 +- spec/unit/honeybadger/agent_spec.rb | 18 +- spec/unit/honeybadger/events_worker_spec.rb | 356 ++++++++++++++++++++ 3 files changed, 374 insertions(+), 20 deletions(-) create mode 100644 spec/unit/honeybadger/events_worker_spec.rb diff --git a/lib/honeybadger/events_worker.rb b/lib/honeybadger/events_worker.rb index 7d63bd706..32916c293 100644 --- a/lib/honeybadger/events_worker.rb +++ b/lib/honeybadger/events_worker.rb @@ -155,8 +155,7 @@ def run begin d { 'worker started' } Thread.current.thread_variable_set(:last_sent, Time.now) - Thread.current.thread_variable_set(:send_queue, Queue.new) - Thread.current.thread_variable_set(:send_queue_byte_size, 0) + Thread.current.thread_variable_set(:send_queue, []) loop do case msg = queue.pop @@ -179,24 +178,19 @@ def run def enqueue_msg(msg) queue = Thread.current.thread_variable_get(:send_queue) - queue_byte_size = Thread.current.thread_variable_get(:send_queue_byte_size) - size = msg.to_json.bytesize + 1 - Thread.current.thread_variable_set(:send_queue_byte_size, queue_byte_size + size) queue << msg + # queue_byte_size = Thread.current.thread_variable_get(:send_queue_byte_size) + # size = msg.to_json.bytesize + 1 + # Thread.current.thread_variable_set(:send_queue_byte_size, queue_byte_size + size) end def check_and_send queue = Thread.current.thread_variable_get(:send_queue) return if queue.empty? last_sent = Thread.current.thread_variable_get(:last_sent) - queue_byte_size = Thread.current.thread_variable_get(:send_queue_byte_size) - - if queue.size >= MAX_EVENTS || (Time.now.to_i - last_sent.to_i) >= SEND_TIMEOUT || queue_byte_size >= MAX_EVENTS_SIZE - to_send = [] - while !queue.empty? - to_send << queue.pop - end - send_now(to_send) + if queue.length >= MAX_EVENTS || (Time.now.to_i - last_sent.to_i) >= SEND_TIMEOUT + send_now(queue) + queue.clear end end diff --git a/spec/unit/honeybadger/agent_spec.rb b/spec/unit/honeybadger/agent_spec.rb index 9a2758341..264bc8ed2 100644 --- a/spec/unit/honeybadger/agent_spec.rb +++ b/spec/unit/honeybadger/agent_spec.rb @@ -1,4 +1,5 @@ require 'honeybadger/agent' +require 'honeybadger/events_worker' require 'timecop' describe Honeybadger::Agent do @@ -284,18 +285,21 @@ end context "#event" do - let(:logger) { double(NULL_LOGGER) } - let(:config) { Honeybadger::Config.new(api_key:'fake api key', logger: logger, backend: :debug) } + let(:config) { Honeybadger::Config.new(api_key:'fake api key', logger: NULL_LOGGER, backend: :debug) } + let(:events_worker) { double(Honeybadger::EventsWorker.new(config)) } let(:instance) { Honeybadger::Agent.new(config) } subject { instance } + before do + allow(instance).to receive(:events_worker).and_return(events_worker) + end + it "logs an event" do - expect(logger).to receive(:add) do |level, msg| - expect(level).to eq(Logger::Severity::UNKNOWN) - expect(msg).to match(/"some_data":"is here"/) - expect(msg).to match(/"event_type":"test_event"/) - expect(msg).to match(/"ts":/) + expect(events_worker).to receive(:push) do |msg| + expect(msg[:event_type]).to eq("test_event") + expect(msg[:some_data]).to eq("is here") + expect(msg[:ts]).not_to be_nil end subject.event("test_event", some_data: "is here") end diff --git a/spec/unit/honeybadger/events_worker_spec.rb b/spec/unit/honeybadger/events_worker_spec.rb new file mode 100644 index 000000000..8a033f699 --- /dev/null +++ b/spec/unit/honeybadger/events_worker_spec.rb @@ -0,0 +1,356 @@ +require 'timecop' +require 'thread' + +require 'honeybadger/events_worker' +require 'honeybadger/config' +require 'honeybadger/backend' + +describe Honeybadger::EventsWorker do + let!(:instance) { described_class.new(config) } + let(:config) { Honeybadger::Config.new(logger: NULL_LOGGER, debug: true, backend: 'null') } + let(:event) { {event_type: "test", ts: "not-important"} } + + subject { instance } + + after do + Thread.list.each do |thread| + next unless thread.kind_of?(Honeybadger::EventsWorker::Thread) + Thread.kill(thread) + end + end + + context "when an exception happens in the worker loop" do + before do + allow(instance.send(:queue)).to receive(:pop).and_raise('fail') + end + + it "does not raise when shutting down" do + instance.push(event) + + expect { instance.shutdown }.not_to raise_error + end + + it "exits the loop" do + instance.push(event) + instance.flush + + sleep(0.1) + expect(instance.send(:thread)).not_to be_alive + end + + it "logs the error" do + allow(config.logger).to receive(:error) + expect(config.logger).to receive(:error).with(/error/i) + + instance.push(event) + instance.flush + end + end + + context "when an exception happens during processing" do + before do + allow(instance).to receive(:sleep) + allow(instance).to receive(:handle_response).and_raise('fail') + end + + def flush + instance.push(event) + instance.flush + end + + it "does not raise when shutting down" do + flush + expect { instance.shutdown }.not_to raise_error + end + + it "does not exit the loop" do + flush + expect(instance.send(:thread)).to be_alive + end + + # it "logs the error" do + # allow(config.logger).to receive(:error) + # expect(config.logger).to receive(:error).with(/error/i) + # flush + # end + end + + describe "#initialize" do + describe "#queue" do + subject { instance.send(:queue) } + + it { should be_a Queue } + end + + describe "#backend" do + subject { instance.send(:backend) } + + before do + allow(Honeybadger::Backend::Null).to receive(:new).with(config).and_return(config.backend) + end + + it { should be_a Honeybadger::Backend::Base } + + it "is initialized from config" do + should eq config.backend + end + end + end + + # describe "#push" do + # it "flushes payload to backend" do + # expect(instance.send(:backend)).to receive(:notify).with(:notices, obj).and_call_original + # expect(instance.push(obj)).not_to eq false + # instance.flush + # end + + # context "when not started" do + # before do + # allow(instance).to receive(:start).and_return false + # end + + # it "rejects push" do + # expect(instance.send(:queue)).not_to receive(:push) + # expect(instance.push(obj)).to eq false + # end + # end + + # context "when queue is full" do + # before do + # allow(config).to receive(:max_queue_size).and_return(5) + # allow(instance).to receive(:queue).and_return(double(size: 5)) + # end + + # it "rejects the push" do + # expect(instance.send(:queue)).not_to receive(:push) + # expect(instance.push(obj)).to eq false + # end + + # it "warns the logger" do + # allow(config.logger).to receive(:warn) + # expect(config.logger).to receive(:warn).with(/reached max/i) + # instance.push(obj) + # end + # end + # end + + # describe "#start" do + # it "starts the thread" do + # expect { subject.start }.to change(subject, :thread).to(kind_of(Thread)) + # end + + # it "changes the pid to the current pid" do + # allow(Process).to receive(:pid).and_return(:expected) + # expect { subject.start }.to change(subject, :pid).to(:expected) + # end + + # context "when shutdown" do + # before do + # subject.shutdown + # end + + # it "doesn't start" do + # expect { subject.start }.not_to change(subject, :thread) + # end + # end + + # context "when suspended" do + # before do + # subject.send(:suspend, 300) + # end + + # context "and restart is in the future" do + # it "doesn't start" do + # expect { subject.start }.not_to change(subject, :thread) + # end + # end + + # context "and restart is in the past" do + # it "starts the thread" do + # Timecop.travel(Time.now + 301) do + # expect { subject.start }.to change(subject, :thread).to(kind_of(Thread)) + # end + # end + # end + # end + # end + + # describe "#shutdown" do + # before { subject.start } + + # it "blocks until queue is processed" do + # expect(subject.send(:backend)).to receive(:notify).with(kind_of(Symbol), obj).and_call_original + # subject.push(obj) + # subject.shutdown + # end + + # it "stops the thread" do + # subject.shutdown + + # sleep(0.1) + # expect(subject.send(:thread)).not_to be_alive + # end + + # context "when previously throttled" do + # before do + # 100.times { subject.send(:inc_throttle) } + # subject.push(obj) + # sleep(0.01) # Pause to allow throttle to activate + # end + + # it "shuts down immediately" do + # expect(subject.send(:backend)).not_to receive(:notify) + # subject.push(obj) + # subject.shutdown + # end + + # it "does not warn the logger when the queue is empty" do + # allow(config.logger).to receive(:warn) + # expect(config.logger).not_to receive(:warn) + # subject.shutdown + # end + + # it "warns the logger when queue has items" do + # subject.push(obj) + # allow(config.logger).to receive(:warn) + # expect(config.logger).to receive(:warn).with(/throttled/i) + # subject.shutdown + # end + # end + + # context "when throttled during shutdown" do + # before do + # allow(subject.send(:backend)).to receive(:notify).with(:notices, obj).and_return(Honeybadger::Backend::Response.new(429) ) + # end + + # it "shuts down immediately" do + # expect(subject.send(:backend)).to receive(:notify).exactly(1).times + # 5.times { subject.push(obj) } + # subject.shutdown + # end + + # it "does not warn the logger when the queue is empty" do + # allow(config.logger).to receive(:warn) + # expect(config.logger).not_to receive(:warn).with(/throttled/) + + # subject.push(obj) + # subject.shutdown + # end + + # it "warns the logger when the queue has additional items" do + # allow(config.logger).to receive(:warn) + # expect(config.logger).to receive(:warn).with(/throttled/i) + + # 30.times do + # subject.push(obj) + # end + + # subject.shutdown + # end + # end + # end + + # describe "#flush" do + # it "blocks until queue is flushed" do + # expect(subject.send(:backend)).to receive(:notify).with(kind_of(Symbol), obj).and_call_original + # subject.push(obj) + # subject.flush + # end + # end + + # describe "#handle_response" do + # def handle_response + # instance.send(:handle_response, obj, response) + # end + + # before do + # allow(instance).to receive(:suspend).and_return true + # end + + # context "when 429" do + # let(:response) { Honeybadger::Backend::Response.new(429) } + + # it "adds throttle" do + # expect { handle_response }.to change(instance, :throttle_interval).by(0.05) + # end + # end + + # context "when 402" do + # let(:response) { Honeybadger::Backend::Response.new(402) } + + # it "shuts down the worker" do + # expect(instance).to receive(:suspend) + # handle_response + # end + + # it "warns the logger" do + # expect(config.logger).to receive(:warn).with(/payment/) + # handle_response + # end + # end + + # context "when 403" do + # let(:response) { Honeybadger::Backend::Response.new(403, %({"error":"unauthorized"})) } + + # it "shuts down the worker" do + # expect(instance).to receive(:suspend) + # handle_response + # end + + # it "warns the logger" do + # expect(config.logger).to receive(:warn).with(/invalid/) + # handle_response + # end + # end + + # context "when 413" do + # let(:response) { Honeybadger::Backend::Response.new(413, %({"error":"Payload exceeds maximum size"})) } + + # it "warns the logger" do + # expect(config.logger).to receive(:warn).with(/too large/) + # handle_response + # end + # end + + # context "when 201" do + # let(:response) { Honeybadger::Backend::Response.new(201) } + + # context "and there is no throttle" do + # it "doesn't change throttle" do + # expect { handle_response }.not_to change(instance, :throttle_interval) + # end + # end + + # context "and a throttle is set" do + # before { instance.send(:inc_throttle) } + + # it "removes throttle" do + # expect { handle_response }.to change(instance, :throttle_interval).by(-0.05) + # end + # end + + # it "doesn't warn" do + # expect(config.logger).not_to receive(:warn) + # handle_response + # end + # end + + # context "when unknown" do + # let(:response) { Honeybadger::Backend::Response.new(418) } + + # it "warns the logger" do + # expect(config.logger).to receive(:warn).with(/failed/) + # handle_response + # end + # end + + # context "when error" do + # let(:response) { Honeybadger::Backend::Response.new(:error, nil, 'test error message') } + + # it "warns the logger" do + # expect(config.logger).to receive(:warn).with(/test error message/) + # handle_response + # end + # end + # end +end From 5dfb849913f040d6bd5178fd42c54ecc44e60cf2 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Thu, 4 Jan 2024 12:34:20 +0100 Subject: [PATCH 05/11] Worker spec successfully duplicated --- lib/honeybadger/events_worker.rb | 50 +- spec/unit/honeybadger/events_worker_spec.rb | 514 ++++++++++---------- 2 files changed, 290 insertions(+), 274 deletions(-) diff --git a/lib/honeybadger/events_worker.rb b/lib/honeybadger/events_worker.rb index 32916c293..33d8a42fd 100644 --- a/lib/honeybadger/events_worker.rb +++ b/lib/honeybadger/events_worker.rb @@ -16,12 +16,14 @@ class Thread < ::Thread; end # Used to signal the worker to shutdown. SHUTDOWN = :__hb_worker_shutdown! + FLUSH = :__hb_worker_flush! # The base number for the exponential backoff formula when calculating the # throttle interval. `1.05 ** throttle` will reach an interval of 2 minutes # after around 100 429 responses from the server. BASE_THROTTLE = 1.05 + # TODO: These could be configurable? SEND_TIMEOUT = 30 MAX_EVENTS = 200 MAX_EVENTS_SIZE = 400_000 @@ -44,7 +46,7 @@ def push(msg) return false unless start if queue.size >= config.max_queue_size - warn { sprintf('Unable to report error; reached max queue size of %s. id=%s', queue.size, msg.id) } + warn { sprintf('Unable to send event; reached max queue size of %s.', queue.size) } return false end @@ -52,7 +54,7 @@ def push(msg) end def send_now(msg) - handle_response(msg, send_to_backend(msg)) + handle_response(send_to_backend(msg)) end def shutdown(force = false) @@ -66,12 +68,12 @@ def shutdown(force = false) return true unless thread&.alive? if throttled? - warn { sprintf('Unable to report %s error(s) to Honeybadger (currently throttled)', queue.size) } unless queue.empty? + warn { sprintf('Unable to send %s event(s) to Honeybadger (currently throttled)', queue.size) } unless queue.empty? return true end - info { sprintf('Waiting to report %s error(s) to Honeybadger', queue.size) } unless queue.empty? - + info { sprintf('Waiting to send %s events(s) to Honeybadger', queue.size) } unless queue.empty? + queue.push(FLUSH) queue.push(SHUTDOWN) !!thread.join ensure @@ -83,6 +85,7 @@ def shutdown(force = false) def flush mutex.synchronize do if thread && thread.alive? + queue.push(FLUSH) queue.push(marker) marker.wait(mutex) end @@ -160,6 +163,7 @@ def run loop do case msg = queue.pop when SHUTDOWN then break + when FLUSH then flush_send_queue when ConditionVariable then signal_marker(msg) else work(msg) end @@ -185,12 +189,12 @@ def enqueue_msg(msg) end def check_and_send - queue = Thread.current.thread_variable_get(:send_queue) - return if queue.empty? + send_queue = Thread.current.thread_variable_get(:send_queue) + return if send_queue.empty? last_sent = Thread.current.thread_variable_get(:last_sent) - if queue.length >= MAX_EVENTS || (Time.now.to_i - last_sent.to_i) >= SEND_TIMEOUT - send_now(queue) - queue.clear + if send_queue.length >= MAX_EVENTS || (Time.now.to_i - last_sent.to_i) >= SEND_TIMEOUT + send_now(send_queue) + send_queue.clear end end @@ -199,7 +203,7 @@ def work(msg) check_and_send if shutdown? && throttled? - warn { sprintf('Unable to report %s error(s) to Honeybadger (currently throttled)', queue.size) } if queue.size > 1 + warn { sprintf('Unable to semd %s events(s) to Honeybadger (currently throttled)', queue.size) } if queue.size > 1 kill! return end @@ -212,9 +216,21 @@ def work(msg) } end + def flush_send_queue + send_queue = Thread.current.thread_variable_get(:send_queue) + return if send_queue.empty? + send_now(send_queue) + send_queue.clear + rescue StandardError => e + error { + msg = "Error in worker thread class=%s message=%s\n\t%s" + sprintf(msg, e.class, e.message.dump, Array(e.backtrace).join("\n\t")) + } + end + def send_to_backend(msg) d { 'events_worker sending to backend' } - events_backend.send_event(msg) + backend.event(msg) end def calc_throttle_interval @@ -238,21 +254,21 @@ def dec_throttle end end - def handle_response(msg, response) + def handle_response(response) d { sprintf('events_worker response code=%s message=%s', response.code, response.message.to_s.dump) } case response.code when 429, 503 throttle = inc_throttle - warn { sprintf('Event send failed: project is sending too many errors. id=%s code=%s throttle=%s interval=%s', msg.id, response.code, throttle, throttle_interval) } + warn { sprintf('Event send failed: project is sending too many events. code=%s throttle=%s interval=%s', response.code, throttle, throttle_interval) } when 402 - warn { sprintf('Event send failed: payment is required. id=%s code=%s', msg.id, response.code) } + warn { sprintf('Event send failed: payment is required. code=%s', response.code) } suspend(3600) when 403 - warn { sprintf('Event send failed: API key is invalid. id=%s code=%s', msg.id, response.code) } + warn { sprintf('Event send failed: API key is invalid. code=%s', response.code) } suspend(3600) when 413 - warn { sprintf('Event send failed: Payload is too large. id=%s code=%s', msg.id, response.code) } + warn { sprintf('Event send failed: Payload is too large. code=%s', response.code) } when 201 if throttle = dec_throttle info { sprintf('Success ⚡ Event sent code=%s throttle=%s interval=%s', response.code, throttle, throttle_interval) } diff --git a/spec/unit/honeybadger/events_worker_spec.rb b/spec/unit/honeybadger/events_worker_spec.rb index 8a033f699..753f7232d 100644 --- a/spec/unit/honeybadger/events_worker_spec.rb +++ b/spec/unit/honeybadger/events_worker_spec.rb @@ -34,7 +34,7 @@ instance.push(event) instance.flush - sleep(0.1) + sleep(0.2) expect(instance.send(:thread)).not_to be_alive end @@ -97,260 +97,260 @@ def flush end end - # describe "#push" do - # it "flushes payload to backend" do - # expect(instance.send(:backend)).to receive(:notify).with(:notices, obj).and_call_original - # expect(instance.push(obj)).not_to eq false - # instance.flush - # end - - # context "when not started" do - # before do - # allow(instance).to receive(:start).and_return false - # end - - # it "rejects push" do - # expect(instance.send(:queue)).not_to receive(:push) - # expect(instance.push(obj)).to eq false - # end - # end - - # context "when queue is full" do - # before do - # allow(config).to receive(:max_queue_size).and_return(5) - # allow(instance).to receive(:queue).and_return(double(size: 5)) - # end - - # it "rejects the push" do - # expect(instance.send(:queue)).not_to receive(:push) - # expect(instance.push(obj)).to eq false - # end - - # it "warns the logger" do - # allow(config.logger).to receive(:warn) - # expect(config.logger).to receive(:warn).with(/reached max/i) - # instance.push(obj) - # end - # end - # end - - # describe "#start" do - # it "starts the thread" do - # expect { subject.start }.to change(subject, :thread).to(kind_of(Thread)) - # end - - # it "changes the pid to the current pid" do - # allow(Process).to receive(:pid).and_return(:expected) - # expect { subject.start }.to change(subject, :pid).to(:expected) - # end - - # context "when shutdown" do - # before do - # subject.shutdown - # end - - # it "doesn't start" do - # expect { subject.start }.not_to change(subject, :thread) - # end - # end - - # context "when suspended" do - # before do - # subject.send(:suspend, 300) - # end - - # context "and restart is in the future" do - # it "doesn't start" do - # expect { subject.start }.not_to change(subject, :thread) - # end - # end - - # context "and restart is in the past" do - # it "starts the thread" do - # Timecop.travel(Time.now + 301) do - # expect { subject.start }.to change(subject, :thread).to(kind_of(Thread)) - # end - # end - # end - # end - # end - - # describe "#shutdown" do - # before { subject.start } - - # it "blocks until queue is processed" do - # expect(subject.send(:backend)).to receive(:notify).with(kind_of(Symbol), obj).and_call_original - # subject.push(obj) - # subject.shutdown - # end - - # it "stops the thread" do - # subject.shutdown - - # sleep(0.1) - # expect(subject.send(:thread)).not_to be_alive - # end - - # context "when previously throttled" do - # before do - # 100.times { subject.send(:inc_throttle) } - # subject.push(obj) - # sleep(0.01) # Pause to allow throttle to activate - # end - - # it "shuts down immediately" do - # expect(subject.send(:backend)).not_to receive(:notify) - # subject.push(obj) - # subject.shutdown - # end - - # it "does not warn the logger when the queue is empty" do - # allow(config.logger).to receive(:warn) - # expect(config.logger).not_to receive(:warn) - # subject.shutdown - # end - - # it "warns the logger when queue has items" do - # subject.push(obj) - # allow(config.logger).to receive(:warn) - # expect(config.logger).to receive(:warn).with(/throttled/i) - # subject.shutdown - # end - # end - - # context "when throttled during shutdown" do - # before do - # allow(subject.send(:backend)).to receive(:notify).with(:notices, obj).and_return(Honeybadger::Backend::Response.new(429) ) - # end - - # it "shuts down immediately" do - # expect(subject.send(:backend)).to receive(:notify).exactly(1).times - # 5.times { subject.push(obj) } - # subject.shutdown - # end - - # it "does not warn the logger when the queue is empty" do - # allow(config.logger).to receive(:warn) - # expect(config.logger).not_to receive(:warn).with(/throttled/) - - # subject.push(obj) - # subject.shutdown - # end - - # it "warns the logger when the queue has additional items" do - # allow(config.logger).to receive(:warn) - # expect(config.logger).to receive(:warn).with(/throttled/i) - - # 30.times do - # subject.push(obj) - # end - - # subject.shutdown - # end - # end - # end - - # describe "#flush" do - # it "blocks until queue is flushed" do - # expect(subject.send(:backend)).to receive(:notify).with(kind_of(Symbol), obj).and_call_original - # subject.push(obj) - # subject.flush - # end - # end - - # describe "#handle_response" do - # def handle_response - # instance.send(:handle_response, obj, response) - # end - - # before do - # allow(instance).to receive(:suspend).and_return true - # end - - # context "when 429" do - # let(:response) { Honeybadger::Backend::Response.new(429) } - - # it "adds throttle" do - # expect { handle_response }.to change(instance, :throttle_interval).by(0.05) - # end - # end - - # context "when 402" do - # let(:response) { Honeybadger::Backend::Response.new(402) } - - # it "shuts down the worker" do - # expect(instance).to receive(:suspend) - # handle_response - # end - - # it "warns the logger" do - # expect(config.logger).to receive(:warn).with(/payment/) - # handle_response - # end - # end - - # context "when 403" do - # let(:response) { Honeybadger::Backend::Response.new(403, %({"error":"unauthorized"})) } - - # it "shuts down the worker" do - # expect(instance).to receive(:suspend) - # handle_response - # end - - # it "warns the logger" do - # expect(config.logger).to receive(:warn).with(/invalid/) - # handle_response - # end - # end - - # context "when 413" do - # let(:response) { Honeybadger::Backend::Response.new(413, %({"error":"Payload exceeds maximum size"})) } - - # it "warns the logger" do - # expect(config.logger).to receive(:warn).with(/too large/) - # handle_response - # end - # end - - # context "when 201" do - # let(:response) { Honeybadger::Backend::Response.new(201) } - - # context "and there is no throttle" do - # it "doesn't change throttle" do - # expect { handle_response }.not_to change(instance, :throttle_interval) - # end - # end - - # context "and a throttle is set" do - # before { instance.send(:inc_throttle) } - - # it "removes throttle" do - # expect { handle_response }.to change(instance, :throttle_interval).by(-0.05) - # end - # end - - # it "doesn't warn" do - # expect(config.logger).not_to receive(:warn) - # handle_response - # end - # end - - # context "when unknown" do - # let(:response) { Honeybadger::Backend::Response.new(418) } - - # it "warns the logger" do - # expect(config.logger).to receive(:warn).with(/failed/) - # handle_response - # end - # end - - # context "when error" do - # let(:response) { Honeybadger::Backend::Response.new(:error, nil, 'test error message') } - - # it "warns the logger" do - # expect(config.logger).to receive(:warn).with(/test error message/) - # handle_response - # end - # end - # end + describe "#push" do + it "flushes payload to backend" do + expect(instance.send(:backend)).to receive(:event).with([event]).and_call_original + expect(instance.push(event)).not_to eq false + instance.flush + end + + context "when not started" do + before do + allow(instance).to receive(:start).and_return false + end + + it "rejects push" do + expect(instance.send(:queue)).not_to receive(:push) + expect(instance.push(event)).to eq false + end + end + + context "when queue is full" do + before do + allow(config).to receive(:max_queue_size).and_return(5) + allow(instance).to receive(:queue).and_return(double(size: 5)) + end + + it "rejects the push" do + expect(instance.send(:queue)).not_to receive(:push) + expect(instance.push(event)).to eq false + end + + it "warns the logger" do + allow(config.logger).to receive(:warn) + expect(config.logger).to receive(:warn).with(/reached max/i) + instance.push(event) + end + end + end + + describe "#start" do + it "starts the thread" do + expect { subject.start }.to change(subject, :thread).to(kind_of(Thread)) + end + + it "changes the pid to the current pid" do + allow(Process).to receive(:pid).and_return(:expected) + expect { subject.start }.to change(subject, :pid).to(:expected) + end + + context "when shutdown" do + before do + subject.shutdown + end + + it "doesn't start" do + expect { subject.start }.not_to change(subject, :thread) + end + end + + context "when suspended" do + before do + subject.send(:suspend, 300) + end + + context "and restart is in the future" do + it "doesn't start" do + expect { subject.start }.not_to change(subject, :thread) + end + end + + context "and restart is in the past" do + it "starts the thread" do + Timecop.travel(Time.now + 301) do + expect { subject.start }.to change(subject, :thread).to(kind_of(Thread)) + end + end + end + end + end + + describe "#shutdown" do + before { subject.start } + + it "blocks until queue is processed" do + expect(subject.send(:backend)).to receive(:event).with([event]).and_call_original + subject.push(event) + subject.shutdown + end + + it "stops the thread" do + subject.shutdown + + sleep(0.1) + expect(subject.send(:thread)).not_to be_alive + end + + context "when previously throttled" do + before do + 100.times { subject.send(:inc_throttle) } + subject.push(event) + sleep(0.01) # Pause to allow throttle to activate + end + + it "shuts down immediately" do + expect(subject.send(:backend)).not_to receive(:event) + subject.push(event) + subject.shutdown + end + + it "does not warn the logger when the queue is empty" do + allow(config.logger).to receive(:warn) + expect(config.logger).not_to receive(:warn) + subject.shutdown + end + + it "warns the logger when queue has items" do + subject.push(event) + allow(config.logger).to receive(:warn) + expect(config.logger).to receive(:warn).with(/throttled/i) + subject.shutdown + end + end + + context "when throttled during shutdown" do + before do + allow(subject.send(:backend)).to receive(:event).with(anything).and_return(Honeybadger::Backend::Response.new(429) ) + end + + it "shuts down immediately" do + expect(subject.send(:backend)).to receive(:event).exactly(1).times + 5.times { subject.push(event) } + subject.shutdown + end + + it "does not warn the logger when the queue is empty" do + allow(config.logger).to receive(:warn) + expect(config.logger).not_to receive(:warn).with(/throttled/) + + subject.push(event) + subject.shutdown + end + + it "warns the logger when the queue has additional items" do + allow(config.logger).to receive(:warn) + expect(config.logger).to receive(:warn).with(/throttled/i) + 100.times { subject.send(:inc_throttle) } + 10.times do + subject.push(event) + end + + subject.shutdown + end + end + end + + describe "#flush" do + it "blocks until queue is flushed" do + expect(subject.send(:backend)).to receive(:event).with([event]).and_call_original + subject.push(event) + subject.flush + end + end + + describe "#handle_response" do + def handle_response + instance.send(:handle_response, response) + end + + before do + allow(instance).to receive(:suspend).and_return true + end + + context "when 429" do + let(:response) { Honeybadger::Backend::Response.new(429) } + + it "adds throttle" do + expect { handle_response }.to change(instance, :throttle_interval).by(0.05) + end + end + + context "when 402" do + let(:response) { Honeybadger::Backend::Response.new(402) } + + it "shuts down the worker" do + expect(instance).to receive(:suspend) + handle_response + end + + it "warns the logger" do + expect(config.logger).to receive(:warn).with(/payment/) + handle_response + end + end + + context "when 403" do + let(:response) { Honeybadger::Backend::Response.new(403, %({"error":"unauthorized"})) } + + it "shuts down the worker" do + expect(instance).to receive(:suspend) + handle_response + end + + it "warns the logger" do + expect(config.logger).to receive(:warn).with(/invalid/) + handle_response + end + end + + context "when 413" do + let(:response) { Honeybadger::Backend::Response.new(413, %({"error":"Payload exceeds maximum size"})) } + + it "warns the logger" do + expect(config.logger).to receive(:warn).with(/too large/) + handle_response + end + end + + context "when 201" do + let(:response) { Honeybadger::Backend::Response.new(201) } + + context "and there is no throttle" do + it "doesn't change throttle" do + expect { handle_response }.not_to change(instance, :throttle_interval) + end + end + + context "and a throttle is set" do + before { instance.send(:inc_throttle) } + + it "removes throttle" do + expect { handle_response }.to change(instance, :throttle_interval).by(-0.05) + end + end + + it "doesn't warn" do + expect(config.logger).not_to receive(:warn) + handle_response + end + end + + context "when unknown" do + let(:response) { Honeybadger::Backend::Response.new(418) } + + it "warns the logger" do + expect(config.logger).to receive(:warn).with(/failed/) + handle_response + end + end + + context "when error" do + let(:response) { Honeybadger::Backend::Response.new(:error, nil, 'test error message') } + + it "warns the logger" do + expect(config.logger).to receive(:warn).with(/test error message/) + handle_response + end + end + end end From 75fd0017f4d580db042094f4cf6d4144a413e484 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Thu, 4 Jan 2024 17:37:52 +0100 Subject: [PATCH 06/11] Implement timeout mechanism using separate thread Given that the worker relies on the Queue as the main scheduling mechanism I saw no other way than to start a second thread that occasionally throws a message into the queue to check if the timeout is reached. This seems to work in testing. --- lib/honeybadger/backend/base.rb | 5 +- lib/honeybadger/config/defaults.rb | 10 +++ lib/honeybadger/events_worker.rb | 94 ++++++++++++++------- spec/unit/honeybadger/events_worker_spec.rb | 41 ++++++++- 4 files changed, 115 insertions(+), 35 deletions(-) diff --git a/lib/honeybadger/backend/base.rb b/lib/honeybadger/backend/base.rb index 07e87a017..a2ce5022d 100644 --- a/lib/honeybadger/backend/base.rb +++ b/lib/honeybadger/backend/base.rb @@ -111,9 +111,10 @@ def track_deployment(payload) # Send event # @example - # backend.event("email_received", "2023-03-04T12:12:00+1:00", { subject: 'Re: Aquisition' }) + # backend.event([{event_type: "email_received", ts: "2023-03-04T12:12:00+1:00", subject: 'Re: Aquisition' }}) # - # @param [Hash] payload event payload + # @param [Array] payload array of event hashes to send + # @raise NotImplementedError def event(payload) raise NotImplementedError, "must define #event on subclass" end diff --git a/lib/honeybadger/config/defaults.rb b/lib/honeybadger/config/defaults.rb index d884f127c..84a4c47d8 100644 --- a/lib/honeybadger/config/defaults.rb +++ b/lib/honeybadger/config/defaults.rb @@ -91,6 +91,16 @@ class Boolean; end default: 100, type: Integer }, + events_batch_size: { + description: 'Send events batch if n events have accumulated', + default: 100, + type: Integer + }, + events_timeout: { + description: 'Timeout after which the events batch will be sent regardless (in milliseconds)', + default: 30_000, + type: Integer + }, plugins: { description: 'An optional list of plugins to load. Default is to load all plugins.', default: nil, diff --git a/lib/honeybadger/events_worker.rb b/lib/honeybadger/events_worker.rb index 33d8a42fd..35ab6ff78 100644 --- a/lib/honeybadger/events_worker.rb +++ b/lib/honeybadger/events_worker.rb @@ -17,6 +17,7 @@ class Thread < ::Thread; end # Used to signal the worker to shutdown. SHUTDOWN = :__hb_worker_shutdown! FLUSH = :__hb_worker_flush! + CHECK_TIMEOUT = :__hb_worker_check_timeout! # The base number for the exponential backoff formula when calculating the # throttle interval. `1.05 ** throttle` will reach an interval of 2 minutes @@ -24,10 +25,6 @@ class Thread < ::Thread; end BASE_THROTTLE = 1.05 # TODO: These could be configurable? - SEND_TIMEOUT = 30 - MAX_EVENTS = 200 - MAX_EVENTS_SIZE = 400_000 - def initialize(config) @config = config @@ -40,6 +37,11 @@ def initialize(config) @shutdown = false @start_at = nil @pid = Process.pid + @send_queue = [] + @last_sent = nil + + @max_events = config.get(:events_batch_size) + @send_timeout = config.get(:events_timeout) end def push(msg) @@ -103,6 +105,7 @@ def start @pid = Process.pid @thread = Thread.new { run } + @timeout_thread = Thread.new { schedule_timeout_check } end true @@ -110,8 +113,8 @@ def start private - attr_reader :config, :queue, :pid, :mutex, :marker, :thread, :throttle, - :throttle_interval, :start_at + attr_reader :config, :queue, :pid, :mutex, :marker, :thread, :timeout_thread, :throttle, + :throttle_interval, :start_at, :send_queue, :last_sent, :max_events, :send_timeout def_delegator :config, :backend @@ -138,6 +141,7 @@ def kill! if thread Thread.kill(thread) + Thread.kill(timeout_thread) thread.join # Allow ensure blocks to execute. end @@ -154,15 +158,30 @@ def suspend(interval) kill! end + def schedule_timeout_check + loop do + sleep(send_timeout / 1000.0) + ms_since = (Time.now.to_f - last_sent.to_f) * 1000.0 + if ms_since >= send_timeout + queue.push(CHECK_TIMEOUT) + end + end + end + def run begin d { 'worker started' } - Thread.current.thread_variable_set(:last_sent, Time.now) - Thread.current.thread_variable_set(:send_queue, []) - + mutex.synchronize do + @last_sent = Time.now + end loop do + # ms_since = (Time.now.to_f - @last_sent.to_f) * 1000.0 + # if ms_since >= send_timeout + # queue.push(CHECK_TIMEOUT) + # end case msg = queue.pop when SHUTDOWN then break + when CHECK_TIMEOUT then check_timeout when FLUSH then flush_send_queue when ConditionVariable then signal_marker(msg) else work(msg) @@ -180,24 +199,45 @@ def run release_marker end + def check_timeout + return if mutex.synchronize { send_queue.empty? } + ms_since = (Time.now.to_f - last_sent.to_f) * 1000.0 + if ms_since >= send_timeout + send_batch + end + end + def enqueue_msg(msg) - queue = Thread.current.thread_variable_get(:send_queue) - queue << msg - # queue_byte_size = Thread.current.thread_variable_get(:send_queue_byte_size) - # size = msg.to_json.bytesize + 1 - # Thread.current.thread_variable_set(:send_queue_byte_size, queue_byte_size + size) + mutex.synchronize do + @send_queue << msg + end end - def check_and_send - send_queue = Thread.current.thread_variable_get(:send_queue) - return if send_queue.empty? - last_sent = Thread.current.thread_variable_get(:last_sent) - if send_queue.length >= MAX_EVENTS || (Time.now.to_i - last_sent.to_i) >= SEND_TIMEOUT - send_now(send_queue) + def send_batch + send_now(mutex.synchronize { send_queue }) + mutex.synchronize do + @last_sent = Time.now send_queue.clear end end + def check_and_send + return if mutex.synchronize { send_queue.empty? } + if mutex.synchronize { send_queue.length } >= max_events + send_batch + end + end + + def flush_send_queue + return if mutex.synchronize { send_queue.empty? } + send_batch + rescue StandardError => e + error { + msg = "Error in worker thread class=%s message=%s\n\t%s" + sprintf(msg, e.class, e.message.dump, Array(e.backtrace).join("\n\t")) + } + end + def work(msg) enqueue_msg(msg) check_and_send @@ -216,21 +256,11 @@ def work(msg) } end - def flush_send_queue - send_queue = Thread.current.thread_variable_get(:send_queue) - return if send_queue.empty? - send_now(send_queue) - send_queue.clear - rescue StandardError => e - error { - msg = "Error in worker thread class=%s message=%s\n\t%s" - sprintf(msg, e.class, e.message.dump, Array(e.backtrace).join("\n\t")) - } - end def send_to_backend(msg) d { 'events_worker sending to backend' } - backend.event(msg) + response = backend.event(msg) + response end def calc_throttle_interval diff --git a/spec/unit/honeybadger/events_worker_spec.rb b/spec/unit/honeybadger/events_worker_spec.rb index 753f7232d..d63e8fb22 100644 --- a/spec/unit/honeybadger/events_worker_spec.rb +++ b/spec/unit/honeybadger/events_worker_spec.rb @@ -7,7 +7,13 @@ describe Honeybadger::EventsWorker do let!(:instance) { described_class.new(config) } - let(:config) { Honeybadger::Config.new(logger: NULL_LOGGER, debug: true, backend: 'null') } + let(:config) { + Honeybadger::Config.new( + logger: NULL_LOGGER, debug: true, backend: 'null', + events_batch_size: 5, + events_timeout: 10_000 + ) + } let(:event) { {event_type: "test", ts: "not-important"} } subject { instance } @@ -353,4 +359,37 @@ def handle_response end end end + + describe "batching" do + it "should send after batch size is reached" do + expect(subject.send(:backend)).to receive(:event).with([event] * 5).and_return(Honeybadger::Backend::Null::StubbedResponse.new) + 5.times do + subject.push(event) + end + sleep(0.2) + end + context "timeout" do + let(:config) { + Honeybadger::Config.new( + logger: NULL_LOGGER, debug: true, backend: 'null', + events_batch_size: 5, + events_timeout: 100 + ) + } + + it "should send after timeout when sending another" do + expect(subject.send(:backend)).to receive(:event).with([event]).twice().and_return(Honeybadger::Backend::Null::StubbedResponse.new) + subject.push(event) + sleep(0.2) + subject.push(event) + sleep(0.2) + end + + it "should send after timeout without new message" do + expect(subject.send(:backend)).to receive(:event).with([event]).and_return(Honeybadger::Backend::Null::StubbedResponse.new) + subject.push(event) + sleep(0.2) + end + end + end end From 2a734687a2d460ff972b8a7258e31bb5eef26ba1 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Mon, 22 Jan 2024 15:17:21 +0100 Subject: [PATCH 07/11] Remove one timeout check, namespace config --- lib/honeybadger/config/defaults.rb | 4 ++-- lib/honeybadger/events_worker.rb | 11 ++++------- spec/unit/honeybadger/events_worker_spec.rb | 14 ++++---------- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/lib/honeybadger/config/defaults.rb b/lib/honeybadger/config/defaults.rb index 84a4c47d8..2d944997c 100644 --- a/lib/honeybadger/config/defaults.rb +++ b/lib/honeybadger/config/defaults.rb @@ -91,12 +91,12 @@ class Boolean; end default: 100, type: Integer }, - events_batch_size: { + :'events.batch_size' => { description: 'Send events batch if n events have accumulated', default: 100, type: Integer }, - events_timeout: { + :'events.timeout' => { description: 'Timeout after which the events batch will be sent regardless (in milliseconds)', default: 30_000, type: Integer diff --git a/lib/honeybadger/events_worker.rb b/lib/honeybadger/events_worker.rb index 35ab6ff78..913288c39 100644 --- a/lib/honeybadger/events_worker.rb +++ b/lib/honeybadger/events_worker.rb @@ -40,8 +40,8 @@ def initialize(config) @send_queue = [] @last_sent = nil - @max_events = config.get(:events_batch_size) - @send_timeout = config.get(:events_timeout) + @max_events = config.get(:'events.batch_size') + @send_timeout = config.get(:'events.timeout') end def push(msg) @@ -161,10 +161,7 @@ def suspend(interval) def schedule_timeout_check loop do sleep(send_timeout / 1000.0) - ms_since = (Time.now.to_f - last_sent.to_f) * 1000.0 - if ms_since >= send_timeout - queue.push(CHECK_TIMEOUT) - end + queue.push(CHECK_TIMEOUT) end end @@ -243,7 +240,7 @@ def work(msg) check_and_send if shutdown? && throttled? - warn { sprintf('Unable to semd %s events(s) to Honeybadger (currently throttled)', queue.size) } if queue.size > 1 + warn { sprintf('Unable to send %s events(s) to Honeybadger (currently throttled)', queue.size) } if queue.size > 1 kill! return end diff --git a/spec/unit/honeybadger/events_worker_spec.rb b/spec/unit/honeybadger/events_worker_spec.rb index d63e8fb22..08e0c42a2 100644 --- a/spec/unit/honeybadger/events_worker_spec.rb +++ b/spec/unit/honeybadger/events_worker_spec.rb @@ -10,8 +10,8 @@ let(:config) { Honeybadger::Config.new( logger: NULL_LOGGER, debug: true, backend: 'null', - events_batch_size: 5, - events_timeout: 10_000 + :'events.batch_size' => 5, + :'events.timeout' => 10_000 ) } let(:event) { {event_type: "test", ts: "not-important"} } @@ -73,12 +73,6 @@ def flush flush expect(instance.send(:thread)).to be_alive end - - # it "logs the error" do - # allow(config.logger).to receive(:error) - # expect(config.logger).to receive(:error).with(/error/i) - # flush - # end end describe "#initialize" do @@ -372,8 +366,8 @@ def handle_response let(:config) { Honeybadger::Config.new( logger: NULL_LOGGER, debug: true, backend: 'null', - events_batch_size: 5, - events_timeout: 100 + :'events.batch_size' => 5, + :'events.timeout' => 100 ) } From d29f6787153fb4e7bef0ba2cbff8d14792db90c9 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Tue, 23 Jan 2024 08:55:10 +0100 Subject: [PATCH 08/11] Remove unused code --- lib/honeybadger/events_worker.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/honeybadger/events_worker.rb b/lib/honeybadger/events_worker.rb index 913288c39..ddfa69558 100644 --- a/lib/honeybadger/events_worker.rb +++ b/lib/honeybadger/events_worker.rb @@ -172,10 +172,6 @@ def run @last_sent = Time.now end loop do - # ms_since = (Time.now.to_f - @last_sent.to_f) * 1000.0 - # if ms_since >= send_timeout - # queue.push(CHECK_TIMEOUT) - # end case msg = queue.pop when SHUTDOWN then break when CHECK_TIMEOUT then check_timeout From 1ce8af9300ddf79d9253ecc3cd620530fee7e973 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Tue, 23 Jan 2024 16:30:31 +0100 Subject: [PATCH 09/11] Add server back end functionality for events This adds a minimal set of tests to ensure API conformance I've tested the code manually against "the real thing(tm)" --- lib/honeybadger/backend/server.rb | 17 +++++-- lib/honeybadger/util/http.rb | 6 +++ spec/unit/honeybadger/backend/server_spec.rb | 48 ++++++++++++++++++++ spec/unit/honeybadger/util/http_spec.rb | 26 +++++++++++ 4 files changed, 94 insertions(+), 3 deletions(-) diff --git a/lib/honeybadger/backend/server.rb b/lib/honeybadger/backend/server.rb index b042bd4b8..5100a2355 100644 --- a/lib/honeybadger/backend/server.rb +++ b/lib/honeybadger/backend/server.rb @@ -11,11 +11,10 @@ module Backend class Server < Base ENDPOINTS = { notices: '/v1/notices'.freeze, - deploys: '/v1/deploys'.freeze + deploys: '/v1/deploys'.freeze, }.freeze - CHECK_IN_ENDPOINT = '/v1/check_in'.freeze - + EVENTS_ENDPOINT = '/v1/events'.freeze HTTP_ERRORS = Util::HTTP::ERRORS @@ -48,6 +47,18 @@ def check_in(id) Response.new(:error, nil, "HTTP Error: #{e.class}") end + # Send event + # @example + # backend.event([{event_type: "email_received", ts: "2023-03-04T12:12:00+1:00", subject: 'Re: Aquisition' }}) + # + # @param [Array] payload array of event hashes to send + # @return [Response] + def event(payload) + Response.new(@http.post_newline_delimited(EVENTS_ENDPOINT, payload)) + rescue *HTTP_ERRORS => e + Response.new(:error, nil, "HTTP Error: #{e.class}") + end + private def payload_headers(payload) diff --git a/lib/honeybadger/util/http.rb b/lib/honeybadger/util/http.rb index 844729e76..d20304157 100644 --- a/lib/honeybadger/util/http.rb +++ b/lib/honeybadger/util/http.rb @@ -49,6 +49,12 @@ def post(endpoint, payload, headers = nil) response end + def post_newline_delimited(endpoint, payload, headers = nil) + response = http_connection.post(endpoint, compress(payload.map(&:to_json).join("\n")), http_headers(headers)) + debug { sprintf("http method=POST path=%s code=%d", endpoint.dump, response.code) } + response + end + private attr_reader :config diff --git a/spec/unit/honeybadger/backend/server_spec.rb b/spec/unit/honeybadger/backend/server_spec.rb index f643fe47a..b71c58051 100644 --- a/spec/unit/honeybadger/backend/server_spec.rb +++ b/spec/unit/honeybadger/backend/server_spec.rb @@ -11,6 +11,7 @@ it { should respond_to :notify } it { should respond_to :check_in } + it { should respond_to :event } describe "#check_in" do it "returns a response" do @@ -79,6 +80,53 @@ def notify_backend subject.notify(:notices, payload) end + end + + describe "#event" do + it "returns the response" do + stub_http + expect(send_event).to be_a Honeybadger::Backend::Response + end + + it "adds auth headers" do + http = stub_http + expect(http).to receive(:post).with(anything, anything, hash_including({ 'X-API-Key' => 'abc123'})) + send_event + end + + it "serialises json and compresses" do + http = stub_http + expect(http).to receive(:post) do |path, body, headers| + cleartext_body = Zlib::Inflate.inflate(body) + json = JSON.parse(cleartext_body) + expect(json["ts"]).to_not be_nil + expect(json["event_type"]).to eq("checkout") + expect(json["increment"]).to eq(0) + end + send_event + end + + it "serialises json newline delimited and compresses" do + http = stub_http + expect(http).to receive(:post) do |path, body, headers| + cleartext_body = Zlib::Inflate.inflate(body) + + the_jsons = cleartext_body.split("\n").map { |t| JSON.parse(t) } + expect(the_jsons.length).to eq(2) + expect(the_jsons[0]["ts"]).to_not be_nil + expect(the_jsons[0]["event_type"]).to eq("checkout") + expect(the_jsons[0]["sum"]).to eq("123.23") + expect(the_jsons[0]["increment"]).to eq(0) + expect(the_jsons[1]["increment"]).to eq(1) + end + send_event(2) + end + + def send_event(count=1) + payload = [] + count.times {|i| payload << {ts: DateTime.now.new_offset(0).rfc3339, event_type: "checkout", sum: "123.23", increment: i} } + subject.event(payload) + end end end diff --git a/spec/unit/honeybadger/util/http_spec.rb b/spec/unit/honeybadger/util/http_spec.rb index 9e9584013..93da9c7df 100644 --- a/spec/unit/honeybadger/util/http_spec.rb +++ b/spec/unit/honeybadger/util/http_spec.rb @@ -10,6 +10,7 @@ subject { described_class.new(config) } it { should respond_to :post } + it { should respond_to :post_newline_delimited } it { should respond_to :get } it "sends a user agent with version number" do @@ -57,6 +58,11 @@ expect(http_post).to be_a Net::HTTPResponse end + it "returns the response for post_newline_delimited" do + stub_http + expect(http_post_newline_delimited).to be_a Net::HTTPResponse + end + it "returns the response for #get" do stub_http expect(http_get).to be_a Net::HTTPResponse @@ -240,10 +246,30 @@ end end + describe "#post_newline_delimited" do + it "should properly serialize NDJSON and compress" do + http = stub_http + expect(http).to receive(:post) do |path, body, headers| + expect(path).to eq("/v1/foo") + decompressed = Zlib::Inflate.inflate(body) + parts = decompressed.split("\n").map { |part| JSON.parse(part) } + expect(parts.length).to be(2) + + Net::HTTPSuccess.new('1.2', '200', 'OK') + end + http_post_newline_delimited + end + end + def http_post subject.post('/v1/foo', double('Notice', to_json: '{}')) end + def http_post_newline_delimited + ts = DateTime.now.new_offset(0).rfc3339 + subject.post_newline_delimited('/v1/foo', [{ts: ts, event_type: "test"}, {ts: ts, event_type: "test2"}]) + end + def http_get subject.get('/v1/foo') end From 44b1638d6902377b17e81b6927ac99ef8c60a9fa Mon Sep 17 00:00:00 2001 From: Joshua Wood Date: Wed, 24 Jan 2024 16:16:42 -0800 Subject: [PATCH 10/11] Add events worker to agent stop/flush commands --- lib/honeybadger/agent.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/honeybadger/agent.rb b/lib/honeybadger/agent.rb index af2277842..bff3666d5 100644 --- a/lib/honeybadger/agent.rb +++ b/lib/honeybadger/agent.rb @@ -355,6 +355,7 @@ def flush yield ensure worker.flush + events_worker.flush end # Stops the Honeybadger service. @@ -363,6 +364,7 @@ def flush # Honeybadger.stop # => nil def stop(force = false) worker.shutdown(force) + events_worker.shutdown(force) true end From d2b513e61786e4b3228a2187a785c46a83cea504 Mon Sep 17 00:00:00 2001 From: Jan Krutisch Date: Thu, 25 Jan 2024 11:15:23 +0100 Subject: [PATCH 11/11] Fix debug message in events worker --- lib/honeybadger/events_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/honeybadger/events_worker.rb b/lib/honeybadger/events_worker.rb index ddfa69558..e0b9ff937 100644 --- a/lib/honeybadger/events_worker.rb +++ b/lib/honeybadger/events_worker.rb @@ -60,7 +60,7 @@ def send_now(msg) end def shutdown(force = false) - d { 'shutting down worker' } + d { 'shutting down events worker' } mutex.synchronize do @shutdown = true