Skip to content

Commit

Permalink
fix empty queue condition for queue shutdown drain, PR #7575
Browse files Browse the repository at this point in the history
page test

queue test

move is_empty? at queue level

new wrapped acked queue spec
  • Loading branch information
colinsurprenant committed Jul 18, 2017
1 parent f3f24dc commit aedf397
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 1 deletion.
6 changes: 5 additions & 1 deletion logstash-core/lib/logstash/util/wrapped_acked_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def check_closed(action)
end
end

def is_empty?
@queue.is_empty?
end

def close
@queue.close
@closed.make_true
Expand Down Expand Up @@ -129,7 +133,7 @@ def close
def empty?
@mutex.lock
begin
@queue.queue.is_fully_acked?
@queue.is_empty?
ensure
@mutex.unlock
end
Expand Down
63 changes: 63 additions & 0 deletions logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# encoding: utf-8
require "spec_helper"
require "logstash/util/wrapped_acked_queue"

describe LogStash::Util::WrappedAckedQueue do
shared_examples "queue tests" do
it "is_empty? on creation" do
expect(queue.is_empty?).to be_truthy
end

it "not is_empty? after pushing an element" do
queue.push(LogStash::Event.new)
expect(queue.is_empty?).to be_falsey
end

it "not is_empty? when all elements are not acked" do
queue.push(LogStash::Event.new)
batch = queue.read_batch(1, 250)
expect(batch.get_elements.size).to eq(1)
expect(queue.is_empty?).to be_falsey
end

it "is_empty? when all elements are acked" do
queue.push(LogStash::Event.new)
batch = queue.read_batch(1, 250)
expect(batch.get_elements.size).to eq(1)
expect(queue.is_empty?).to be_falsey
batch.close
expect(queue.is_empty?).to be_truthy
end
end

context "memory" do
let(:page_capacity) { 1024 }
let(:max_events) { 0 }
let(:max_bytes) { 0 }
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::Util::WrappedAckedQueue.create_memory_based(path, page_capacity, max_events, max_bytes) }

after do
queue.close
end

include_examples "queue tests"
end

context "persisted" do
let(:page_capacity) { 1024 }
let(:max_events) { 0 }
let(:max_bytes) { 0 }
let(:checkpoint_acks) { 1024 }
let(:checkpoint_writes) { 1024 }
let(:checkpoint_interval) { 0 }
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::Util::WrappedAckedQueue.create_file_based(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, checkpoint_interval, max_bytes) }

after do
queue.close
end

include_examples "queue tests"
end
end
11 changes: 11 additions & 0 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public Batch readBatch(int limit) throws IOException {
return new Batch(deserialized, serialized.getSeqNums(), this.queue);
}

/**
* Page is considered empty if it does not contain any element or if all elements are acked.
*
* TODO: note that this should be the same as isFullyAcked once fixed per https://github.com/elastic/logstash/issues/7570
*
* @return true if the page has no element or if all elements are acked.
*/
public boolean isEmpty() {
return this.elementCount == 0 || isFullyAcked();
}

public boolean isFullyRead() {
return unreadCount() <= 0;
// return this.elementCount <= 0 || this.firstUnreadSeqNum > maxSeqNum();
Expand Down
18 changes: 18 additions & 0 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,24 @@ public boolean isFull() {
}
}

/**
* Queue is considered empty if it does not contain any tail page and the headpage has no element or all
* elements are acked
*
* TODO: note that this should be the same as isFullyAcked once fixed per https://github.com/elastic/logstash/issues/7570
*
* @return true if the queue has no tail page and the head page is empty.
*/
public boolean isEmpty() {
lock.lock();
try {
return this.tailPages.isEmpty() && this.headPage.isEmpty();
} finally {
lock.unlock();
}

}

// @return true if the queue is fully acked, which implies that it is fully read which works as an "empty" state.
public boolean isFullyAcked() {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public IRubyObject ruby_is_fully_acked(ThreadContext context)
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
}

@JRubyMethod(name = "is_empty?")
public IRubyObject ruby_is_empty(ThreadContext context)
{
return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
}

@JRubyMethod(name = "close")
public IRubyObject ruby_close(ThreadContext context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public IRubyObject ruby_is_fully_acked(ThreadContext context)
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
}

@JRubyMethod(name = "is_empty?")
public IRubyObject ruby_is_empty(ThreadContext context)
{
return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
}

@JRubyMethod(name = "close")
public IRubyObject ruby_close(ThreadContext context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ public void pageWriteAndReadSingle() throws IOException {
}
}

@Test
public void inEmpty() throws IOException {
Queueable element = new StringElement("foobarbaz");

Settings s = TestSettings.volatileQueueSettings(1000);
try(Queue q = new Queue(s)) {
q.open();
HeadPage p = q.headPage;

assertThat(p.isEmpty(), is(true));
p.write(element.serialize(), 1, 1);
assertThat(p.isEmpty(), is(false));
Batch b = q.readBatch(1);
assertThat(p.isEmpty(), is(false));
b.close();
assertThat(p.isEmpty(), is(true));
}
}

@Test
public void pageWriteAndReadMulti() throws IOException {
long seqNum = 1L;
Expand Down
18 changes: 18 additions & 0 deletions logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -764,4 +764,22 @@ private void stableUnderStress(final int capacity) throws IOException {
);
}
}

@Test
public void inEmpty() throws IOException {
try(Queue q = new Queue(TestSettings.volatileQueueSettings(1000))) {
q.open();
assertThat(q.isEmpty(), is(true));

q.write(new StringElement("foobarbaz"));
assertThat(q.isEmpty(), is(false));

Batch b = q.readBatch(1);
assertThat(q.isEmpty(), is(false));

b.close();
assertThat(q.isEmpty(), is(true));
}
}

}

0 comments on commit aedf397

Please sign in to comment.