Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queue drain option support #6433

Merged
merged 1 commit into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyBoolean;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
Expand Down Expand Up @@ -43,7 +44,6 @@ public IRubyObject allocate(Ruby runtime, RubyClass rubyClass) {
// as a simplified first prototyping implementation, the Settings class is not exposed and the queue elements
// are assumed to be logstash Event.


@JRubyClass(name = "AckedQueue", parent = "Object")
public static class RubyAckedQueue extends RubyObject {
private Queue queue;
Expand Down Expand Up @@ -171,6 +171,11 @@ public IRubyObject ruby_read_batch(ThreadContext context, IRubyObject limit, IRu
return (b == null) ? context.nil : new JrubyAckedBatchExtLibrary.RubyAckedBatch(context.runtime, b);
}

@JRubyMethod(name = "is_fully_acked?")
public IRubyObject ruby_is_fully_acked(ThreadContext context)
{
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
}

@JRubyMethod(name = "close")
public IRubyObject ruby_close(ThreadContext context)
Expand All @@ -183,6 +188,5 @@ public IRubyObject ruby_close(ThreadContext context)

return context.nil;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.jruby.RubyFixnum;
import org.jruby.RubyModule;
import org.jruby.RubyObject;
import org.jruby.RubyBoolean;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
Expand Down Expand Up @@ -166,6 +167,11 @@ public IRubyObject ruby_read_batch(ThreadContext context, IRubyObject limit, IRu
return (b == null) ? context.nil : new JrubyAckedBatchExtLibrary.RubyAckedBatch(context.runtime, b);
}

@JRubyMethod(name = "is_fully_acked?")
public IRubyObject ruby_is_fully_acked(ThreadContext context)
{
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
}

@JRubyMethod(name = "close")
public IRubyObject ruby_close(ThreadContext context)
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ module Environment
Setting::PortRange.new("http.port", 9600..9700),
Setting::String.new("http.environment", "production"),
Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]),
Setting::Boolean.new("queue.drain", false),
Setting::Bytes.new("queue.page_capacity", "250mb"),
Setting::Bytes.new("queue.max_bytes", "1024mb"),
Setting::Numeric.new("queue.max_events", 0), # 0 is unlimited
Expand Down
35 changes: 24 additions & 11 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil)
@filter_queue_client.set_pipeline_metric(
metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
)
@drain_queue = @settings.get_value("queue.drain")

@events_filtered = Concurrent::AtomicFixnum.new(0)
@events_consumed = Concurrent::AtomicFixnum.new(0)
Expand Down Expand Up @@ -326,26 +327,32 @@ def start_workers
# Main body of what a worker thread does
# Repeatedly takes batches off the queue, filters, then outputs them
def worker_loop(batch_size, batch_delay)
running = true
shutdown_requested = false

@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)

while running
batch = @filter_queue_client.take_batch
while true
signal = @signal_queue.empty? ? NO_SIGNAL : @signal_queue.pop
running = !signal.shutdown?
shutdown_requested |= signal.shutdown? # latch on shutdown signal

batch = @filter_queue_client.read_batch # metrics are started in read_batch
@events_consumed.increment(batch.size)

filter_batch(batch)

if signal.flush? || signal.shutdown?
flush_filters_to_batch(batch, :final => signal.shutdown?)
end

flush_filters_to_batch(batch, :final => false) if signal.flush?
output_batch(batch)
@filter_queue_client.close_batch(batch)

# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
break if shutdown_requested && !draining_queue?
end

# we are shutting down, queue is drained if it was required, now perform a final flush.
# for this we need to create a new empty batch to contain the final flushed events
batch = @filter_queue_client.new_batch
@filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
flush_filters_to_batch(batch, :final => true)
output_batch(batch)
@filter_queue_client.close_batch(batch)
end

def filter_batch(batch)
Expand Down Expand Up @@ -604,4 +611,10 @@ def inspect
:flushing => @flushing
}
end
end end

private

def draining_queue?
@drain_queue ? !@filter_queue_client.empty? : false
end
end; end
64 changes: 46 additions & 18 deletions logstash-core/lib/logstash/util/wrapped_acked_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def close
@queue.close
end

def empty?
@mutex.synchronize { @queue.is_fully_acked? }
end

def set_batch_dimensions(batch_size, wait_for)
@batch_size = batch_size
@wait_for = wait_for
Expand Down Expand Up @@ -157,16 +161,29 @@ def current_inflight_batch
@inflight_batches.fetch(Thread.current, [])
end

def take_batch
# create a new empty batch
# @return [ReadBatch] a new empty read batch
def new_batch
ReadBatch.new(@queue, @batch_size, @wait_for)
end

def read_batch
if @queue.closed?
raise QueueClosedError.new("Attempt to take a batch from a closed AckedQueue")
end

batch = new_batch
@mutex.synchronize { batch.read_next }
start_metrics(batch)
batch
end

def start_metrics(batch)
@mutex.synchronize do
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
# there seems to be concurrency issues with metrics, keep it in the mutex
add_starting_metrics(batch)
set_current_thread_inflight_batch(batch)
start_clock
batch
end
end

Expand All @@ -177,21 +194,30 @@ def set_current_thread_inflight_batch(batch)
def close_batch(batch)
@mutex.synchronize do
batch.close

# there seems to be concurrency issues with metrics, keep it in the mutex
@inflight_batches.delete(Thread.current)
stop_clock
stop_clock(batch)
end
end

def start_clock
@inflight_clocks[Thread.current] = [
@event_metric.time(:duration_in_millis),
@pipeline_metric.time(:duration_in_millis)
@event_metric.time(:duration_in_millis),
@pipeline_metric.time(:duration_in_millis)
]
end

def stop_clock
@inflight_clocks[Thread.current].each(&:stop)
@inflight_clocks.delete(Thread.current)
def stop_clock(batch)
unless @inflight_clocks[Thread.current].nil?
if batch.size > 0
# onl/y stop (which also records) the metrics if the batch is non-empty.
# start_clock is now called at empty batch creation and an empty batch could
# stay empty all the way down to the close_batch call.
@inflight_clocks[Thread.current].each(&:stop)
end
@inflight_clocks.delete(Thread.current)
end
end

def add_starting_metrics(batch)
Expand All @@ -213,6 +239,10 @@ def add_output_metrics(batch)

class ReadBatch
def initialize(queue, size, wait)
@queue = queue
@size = size
@wait = wait

@originals = Hash.new

# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
Expand All @@ -221,7 +251,13 @@ def initialize(queue, size, wait)
@generated = Hash.new
@iterating_temp = Hash.new
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
take_originals_from_queue(queue, size, wait) # this sets a reference to @acked_batch
@acked_batch = nil
end

def read_next
@acked_batch = @queue.read_batch(@size, @wait)
return if @acked_batch.nil?
@acked_batch.get_elements.each { |e| @originals[e] = true }
end

def close
Expand Down Expand Up @@ -301,14 +337,6 @@ def update_generated
@generated.update(@iterating_temp)
@iterating_temp.clear
end

def take_originals_from_queue(queue, size, wait)
@acked_batch = queue.read_batch(size, wait)
return if @acked_batch.nil?
@acked_batch.get_elements.each do |e|
@originals[e] = true
end
end
end

class WriteClient
Expand Down
67 changes: 44 additions & 23 deletions logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def close
# noop, compat with acked queue read client
end

def empty?
true # synchronous queue is alway empty
end

def set_batch_dimensions(batch_size, wait_for)
@batch_size = batch_size
@wait_for = wait_for
Expand Down Expand Up @@ -104,18 +108,25 @@ def current_inflight_batch
@inflight_batches.fetch(Thread.current, [])
end

def take_batch
# create a new empty batch
# @return [ReadBatch] a new empty read batch
def new_batch
ReadBatch.new(@queue, @batch_size, @wait_for)
end

def read_batch
batch = new_batch
@mutex.synchronize { batch.read_next }
start_metrics(batch)
batch
end

def start_metrics(batch)
@mutex.synchronize do
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
# there seems to be concurrency issues with metrics, keep it in the mutex
add_starting_metrics(batch)
set_current_thread_inflight_batch(batch)

# We dont actually have any events to work on so lets
# not bother with recording metrics for them
if batch.size > 0
add_starting_metrics(batch)
start_clock
end
batch
start_clock
end
end

Expand All @@ -125,8 +136,9 @@ def set_current_thread_inflight_batch(batch)

def close_batch(batch)
@mutex.synchronize do
# there seems to be concurrency issues with metrics, keep it in the mutex
@inflight_batches.delete(Thread.current)
stop_clock
stop_clock(batch)
end
end

Expand All @@ -137,9 +149,14 @@ def start_clock
]
end

def stop_clock
def stop_clock(batch)
unless @inflight_clocks[Thread.current].nil?
@inflight_clocks[Thread.current].each(&:stop)
if batch.size > 0
# only stop (which also records) the metrics if the batch is non-empty.
# start_clock is now called at empty batch creation and an empty batch could
# stay empty all the way down to the close_batch call.
@inflight_clocks[Thread.current].each(&:stop)
end
@inflight_clocks.delete(Thread.current)
end
end
Expand All @@ -162,6 +179,10 @@ def add_output_metrics(batch)

class ReadBatch
def initialize(queue, size, wait)
@queue = queue
@size = size
@wait = wait

@originals = Hash.new

# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
Expand All @@ -170,7 +191,16 @@ def initialize(queue, size, wait)
@generated = Hash.new
@iterating_temp = Hash.new
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
take_originals_from_queue(queue, size, wait)
@acked_batch = nil
end

def read_next
@size.times do |t|
event = @queue.poll(@wait)
return if event.nil? # queue poll timed out

@originals[event] = true
end
end

def merge(event)
Expand Down Expand Up @@ -235,15 +265,6 @@ def update_generated
@generated.update(@iterating_temp)
@iterating_temp.clear
end

def take_originals_from_queue(queue, size, wait)
size.times do |t|
event = queue.poll(wait)
return if event.nil? # queue poll timed out

@originals[event] = true
end
end
end

class WriteClient
Expand Down
Loading