diff --git a/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java b/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java index e36f425ee3f..839876bc2f2 100644 --- a/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java +++ b/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueExtLibrary.java @@ -7,6 +7,7 @@ import org.jruby.RubyFixnum; import org.jruby.RubyModule; import org.jruby.RubyObject; +import org.jruby.RubyBoolean; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.Arity; @@ -43,7 +44,6 @@ public IRubyObject allocate(Ruby runtime, RubyClass rubyClass) { // as a simplified first prototyping implementation, the Settings class is not exposed and the queue elements // are assumed to be logstash Event. - @JRubyClass(name = "AckedQueue", parent = "Object") public static class RubyAckedQueue extends RubyObject { private Queue queue; @@ -171,6 +171,11 @@ public IRubyObject ruby_read_batch(ThreadContext context, IRubyObject limit, IRu return (b == null) ? context.nil : new JrubyAckedBatchExtLibrary.RubyAckedBatch(context.runtime, b); } + @JRubyMethod(name = "is_fully_acked?") + public IRubyObject ruby_is_fully_acked(ThreadContext context) + { + return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked()); + } @JRubyMethod(name = "close") public IRubyObject ruby_close(ThreadContext context) @@ -183,6 +188,5 @@ public IRubyObject ruby_close(ThreadContext context) return context.nil; } - } } diff --git a/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java b/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java index db80f228454..cd53cb0e80c 100644 --- a/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java +++ b/logstash-core-queue-jruby/src/main/java/org/logstash/ackedqueue/ext/JrubyAckedQueueMemoryExtLibrary.java @@ -7,6 +7,7 @@ import org.jruby.RubyFixnum; import org.jruby.RubyModule; import org.jruby.RubyObject; +import org.jruby.RubyBoolean; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.Arity; @@ -166,6 +167,11 @@ public IRubyObject ruby_read_batch(ThreadContext context, IRubyObject limit, IRu return (b == null) ? context.nil : new JrubyAckedBatchExtLibrary.RubyAckedBatch(context.runtime, b); } + @JRubyMethod(name = "is_fully_acked?") + public IRubyObject ruby_is_fully_acked(ThreadContext context) + { + return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked()); + } @JRubyMethod(name = "close") public IRubyObject ruby_close(ThreadContext context) diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index 7c2ebed7638..c17bf0e657e 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -41,6 +41,7 @@ module Environment Setting::PortRange.new("http.port", 9600..9700), Setting::String.new("http.environment", "production"), Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]), + Setting::Boolean.new("queue.drain", false), Setting::Bytes.new("queue.page_capacity", "250mb"), Setting::Bytes.new("queue.max_bytes", "1024mb"), Setting::Numeric.new("queue.max_events", 0), # 0 is unlimited diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index b8ec438be93..f473bd715c5 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -154,6 +154,7 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil) @filter_queue_client.set_pipeline_metric( metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events]) ) + @drain_queue = @settings.get_value("queue.drain") @events_filtered = Concurrent::AtomicFixnum.new(0) @events_consumed = Concurrent::AtomicFixnum.new(0) @@ -326,26 +327,32 @@ def start_workers # Main body of what a worker thread does # Repeatedly takes batches off the queue, filters, then outputs them def worker_loop(batch_size, batch_delay) - running = true + shutdown_requested = false @filter_queue_client.set_batch_dimensions(batch_size, batch_delay) - while running - batch = @filter_queue_client.take_batch + while true signal = @signal_queue.empty? ? NO_SIGNAL : @signal_queue.pop - running = !signal.shutdown? + shutdown_requested |= signal.shutdown? # latch on shutdown signal + batch = @filter_queue_client.read_batch # metrics are started in read_batch @events_consumed.increment(batch.size) - filter_batch(batch) - - if signal.flush? || signal.shutdown? - flush_filters_to_batch(batch, :final => signal.shutdown?) - end - + flush_filters_to_batch(batch, :final => false) if signal.flush? output_batch(batch) @filter_queue_client.close_batch(batch) + + # keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown. + break if shutdown_requested && !draining_queue? end + + # we are shutting down, queue is drained if it was required, now perform a final flush. + # for this we need to create a new empty batch to contain the final flushed events + batch = @filter_queue_client.new_batch + @filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here + flush_filters_to_batch(batch, :final => true) + output_batch(batch) + @filter_queue_client.close_batch(batch) end def filter_batch(batch) @@ -604,4 +611,10 @@ def inspect :flushing => @flushing } end -end end + + private + + def draining_queue? + @drain_queue ? !@filter_queue_client.empty? : false + end +end; end diff --git a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb index 5dd40c7b889..2cc591fa011 100644 --- a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb @@ -125,6 +125,10 @@ def close @queue.close end + def empty? + @mutex.synchronize { @queue.is_fully_acked? } + end + def set_batch_dimensions(batch_size, wait_for) @batch_size = batch_size @wait_for = wait_for @@ -157,16 +161,29 @@ def current_inflight_batch @inflight_batches.fetch(Thread.current, []) end - def take_batch + # create a new empty batch + # @return [ReadBatch] a new empty read batch + def new_batch + ReadBatch.new(@queue, @batch_size, @wait_for) + end + + def read_batch if @queue.closed? raise QueueClosedError.new("Attempt to take a batch from a closed AckedQueue") end + + batch = new_batch + @mutex.synchronize { batch.read_next } + start_metrics(batch) + batch + end + + def start_metrics(batch) @mutex.synchronize do - batch = ReadBatch.new(@queue, @batch_size, @wait_for) + # there seems to be concurrency issues with metrics, keep it in the mutex add_starting_metrics(batch) set_current_thread_inflight_batch(batch) start_clock - batch end end @@ -177,21 +194,30 @@ def set_current_thread_inflight_batch(batch) def close_batch(batch) @mutex.synchronize do batch.close + + # there seems to be concurrency issues with metrics, keep it in the mutex @inflight_batches.delete(Thread.current) - stop_clock + stop_clock(batch) end end def start_clock @inflight_clocks[Thread.current] = [ - @event_metric.time(:duration_in_millis), - @pipeline_metric.time(:duration_in_millis) + @event_metric.time(:duration_in_millis), + @pipeline_metric.time(:duration_in_millis) ] end - def stop_clock - @inflight_clocks[Thread.current].each(&:stop) - @inflight_clocks.delete(Thread.current) + def stop_clock(batch) + unless @inflight_clocks[Thread.current].nil? + if batch.size > 0 + # onl/y stop (which also records) the metrics if the batch is non-empty. + # start_clock is now called at empty batch creation and an empty batch could + # stay empty all the way down to the close_batch call. + @inflight_clocks[Thread.current].each(&:stop) + end + @inflight_clocks.delete(Thread.current) + end end def add_starting_metrics(batch) @@ -213,6 +239,10 @@ def add_output_metrics(batch) class ReadBatch def initialize(queue, size, wait) + @queue = queue + @size = size + @wait = wait + @originals = Hash.new # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor @@ -221,7 +251,13 @@ def initialize(queue, size, wait) @generated = Hash.new @iterating_temp = Hash.new @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads - take_originals_from_queue(queue, size, wait) # this sets a reference to @acked_batch + @acked_batch = nil + end + + def read_next + @acked_batch = @queue.read_batch(@size, @wait) + return if @acked_batch.nil? + @acked_batch.get_elements.each { |e| @originals[e] = true } end def close @@ -301,14 +337,6 @@ def update_generated @generated.update(@iterating_temp) @iterating_temp.clear end - - def take_originals_from_queue(queue, size, wait) - @acked_batch = queue.read_batch(size, wait) - return if @acked_batch.nil? - @acked_batch.get_elements.each do |e| - @originals[e] = true - end - end end class WriteClient diff --git a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb index e36d88eb45f..8a86c5cb3b3 100644 --- a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb @@ -72,6 +72,10 @@ def close # noop, compat with acked queue read client end + def empty? + true # synchronous queue is alway empty + end + def set_batch_dimensions(batch_size, wait_for) @batch_size = batch_size @wait_for = wait_for @@ -104,18 +108,25 @@ def current_inflight_batch @inflight_batches.fetch(Thread.current, []) end - def take_batch + # create a new empty batch + # @return [ReadBatch] a new empty read batch + def new_batch + ReadBatch.new(@queue, @batch_size, @wait_for) + end + + def read_batch + batch = new_batch + @mutex.synchronize { batch.read_next } + start_metrics(batch) + batch + end + + def start_metrics(batch) @mutex.synchronize do - batch = ReadBatch.new(@queue, @batch_size, @wait_for) + # there seems to be concurrency issues with metrics, keep it in the mutex + add_starting_metrics(batch) set_current_thread_inflight_batch(batch) - - # We dont actually have any events to work on so lets - # not bother with recording metrics for them - if batch.size > 0 - add_starting_metrics(batch) - start_clock - end - batch + start_clock end end @@ -125,8 +136,9 @@ def set_current_thread_inflight_batch(batch) def close_batch(batch) @mutex.synchronize do + # there seems to be concurrency issues with metrics, keep it in the mutex @inflight_batches.delete(Thread.current) - stop_clock + stop_clock(batch) end end @@ -137,9 +149,14 @@ def start_clock ] end - def stop_clock + def stop_clock(batch) unless @inflight_clocks[Thread.current].nil? - @inflight_clocks[Thread.current].each(&:stop) + if batch.size > 0 + # only stop (which also records) the metrics if the batch is non-empty. + # start_clock is now called at empty batch creation and an empty batch could + # stay empty all the way down to the close_batch call. + @inflight_clocks[Thread.current].each(&:stop) + end @inflight_clocks.delete(Thread.current) end end @@ -162,6 +179,10 @@ def add_output_metrics(batch) class ReadBatch def initialize(queue, size, wait) + @queue = queue + @size = size + @wait = wait + @originals = Hash.new # TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor @@ -170,7 +191,16 @@ def initialize(queue, size, wait) @generated = Hash.new @iterating_temp = Hash.new @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads - take_originals_from_queue(queue, size, wait) + @acked_batch = nil + end + + def read_next + @size.times do |t| + event = @queue.poll(@wait) + return if event.nil? # queue poll timed out + + @originals[event] = true + end end def merge(event) @@ -235,15 +265,6 @@ def update_generated @generated.update(@iterating_temp) @iterating_temp.clear end - - def take_originals_from_queue(queue, size, wait) - size.times do |t| - event = queue.poll(wait) - return if event.nil? # queue poll timed out - - @originals[event] = true - end - end end class WriteClient diff --git a/logstash-core/spec/logstash/pipeline_pq_file_spec.rb b/logstash-core/spec/logstash/pipeline_pq_file_spec.rb index b715f457f0d..29130e41580 100644 --- a/logstash-core/spec/logstash/pipeline_pq_file_spec.rb +++ b/logstash-core/spec/logstash/pipeline_pq_file_spec.rb @@ -82,6 +82,13 @@ def close let(:queue_type) { "persisted" } # "memory" "memory_acked" let(:times) { [] } + let(:pipeline_thread) do + # subject has to be called for the first time outside the thread because it will create a race condition + # with the subject.ready? call since subject is lazily initialized + s = subject + Thread.new { s.run } + end + before :each do FileUtils.mkdir_p(this_queue_folder) @@ -97,19 +104,22 @@ def close pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) } pipeline_settings_obj.set("queue.page_capacity", page_capacity) pipeline_settings_obj.set("queue.max_bytes", max_bytes) - Thread.new do - # make sure we have received all the generated events - while counting_output.event_count < number_of_events do - sleep 1 - end - subject.shutdown - end times.push(Time.now.to_f) - subject.run + + pipeline_thread + sleep(0.1) until subject.ready? + + # make sure we have received all the generated events + while counting_output.event_count < number_of_events do + sleep(0.5) + end + times.unshift(Time.now.to_f - times.first) end after :each do + subject.shutdown + pipeline_thread.join # Dir.rm_rf(this_queue_folder) end diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index dd9b9ae980b..0deb7a609ce 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -3,6 +3,7 @@ require "logstash/inputs/generator" require "logstash/filters/multiline" require_relative "../support/mocks_classes" +require_relative "../logstash/pipeline_reporter_spec" # for DummyOutput class class DummyInput < LogStash::Inputs::Base config_name "dummyinput" diff --git a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb index b9b65fe8886..09968b59cab 100644 --- a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb +++ b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb @@ -60,7 +60,7 @@ def poll(*) shift(); end context "when the queue is empty" do it "doesnt record the `duration_in_millis`" do - batch = read_client.take_batch + batch = read_client.read_batch read_client.close_batch(batch) store = collector.snapshot_metric.metric_store @@ -95,7 +95,8 @@ def poll(*) shift(); end batch = write_client.get_new_batch 5.times {|i| batch.push("value-#{i}")} write_client.push_batch(batch) - read_batch = read_client.take_batch + + read_batch = read_client.read_batch sleep(0.1) # simulate some work for the `duration_in_millis` # TODO: this interaction should be cleaned in an upcoming PR, # This is what the current pipeline does. @@ -126,7 +127,7 @@ def poll(*) shift(); end batch = write_client.get_new_batch 5.times {|i| batch.push(LogStash::Event.new({"message" => "value-#{i}"}))} write_client.push_batch(batch) - read_batch = read_client.take_batch + read_batch = read_client.read_batch expect(read_batch.size).to eq(5) i = 0 read_batch.each do |data| diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index bcbfe988aa9..4bcef21c77c 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -371,6 +371,16 @@ public boolean isFull() { } } + // @return true if the queue is fully acked, which implies that it is fully read which works as an "empty" state. + public boolean isFullyAcked() { + lock.lock(); + try { + return this.tailPages.isEmpty() ? this.headPage.isFullyAcked() : false; + } finally { + lock.unlock(); + } + } + // @param seqNum the element sequence number upper bound for which persistence should be garanteed (by fsync'ing) public void ensurePersistedUpto(long seqNum) throws IOException{ lock.lock();