Skip to content

Commit

Permalink
fix: Wake up if close, don't sleep if there's work
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Sep 2, 2021
1 parent d0b6a9a commit 01096c7
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public abstract class AbstractParallelEoSStreamProcessor<K, V> implements Parall
private WallClock clock = new WallClock();

/**
* Kafka's default auto commit frequency
* Kafka's default auto commit frequency. https://docs.confluent.io/platform/current/clients/consumer.html#id1
*/
private static final int KAFKA_DEFAULT_AUTO_COMMIT_FREQUENCY = 5000;

Expand Down Expand Up @@ -114,7 +114,7 @@ public abstract class AbstractParallelEoSStreamProcessor<K, V> implements Parall
private Thread blockableControlThread;

/**
* @see #notifyNewWorkRegistered
* @see #notifySomethingToDo
* @see #processWorkCompleteMailBox
*/
private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();
Expand Down Expand Up @@ -314,7 +314,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Assigned {} total ({} new) partition(s) {}", numberOfAssignedPartitions, partitions.size(), partitions);
wm.onPartitionsAssigned(partitions);
usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsAssigned(partitions));
notifyNewWorkRegistered();
notifySomethingToDo();
}

/**
Expand Down Expand Up @@ -504,6 +504,7 @@ private boolean isRecordsAwaitingProcessing() {
private void transitionToDraining() {
log.debug("Transitioning to draining...");
this.state = State.draining;
notifySomethingToDo();
}

/**
Expand Down Expand Up @@ -756,6 +757,7 @@ private void transitionToClosing() {
} else {
state = State.closing;
}
notifySomethingToDo();
}

/**
Expand All @@ -771,7 +773,8 @@ private void processWorkCompleteMailBox() {
// blocking get the head of the queue
WorkContainer<K, V> firstBlockingPoll = null;
try {
boolean noWorkToDoAndStillRunning = workMailBox.isEmpty() && state.equals(running);
boolean workAvailable = wm.hasWorkInMailboxes() && wm.isSystemIdle();
boolean noWorkToDoAndStillRunning = workMailBox.isEmpty() && state.equals(running) && !workAvailable;
if (noWorkToDoAndStillRunning) {
if (log.isDebugEnabled()) {
log.debug("Blocking poll on work until next scheduled offset commit attempt for {}. active threads: {}, queue: {}",
Expand Down Expand Up @@ -987,7 +990,7 @@ protected void addToMailbox(WorkContainer<K, V> wc) {
* @see #processWorkCompleteMailBox
* @see #blockableControlThread
*/
public void notifyNewWorkRegistered() {
public void notifySomethingToDo() {
if (currentlyPollingWorkCompleteMailBox.get()) {
boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionInProgress).orElse(false);
if (noTransactionInProgress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private boolean controlLoop() {
// notify control work has been registered, in case it's sleeping waiting for work that will never come
if (!wm.hasWorkInFlight()) {
log.trace("Apparently no work is being done, make sure Control is awake to receive messages");
pc.notifyNewWorkRegistered();
pc.notifySomethingToDo();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ public boolean shouldThrottle() {

/**
* @return true if there's enough messages downloaded from the broker already to satisfy the pipeline, false if more
* should be downloaded (or pipelined in the Consumer)
* should be downloaded (or pipelined in the Consumer)
*/
public boolean isSufficientlyLoaded() {
return getWorkQueuedInMailboxCount() > options.getMaxConcurrency() * getLoadingFactor();
Expand Down Expand Up @@ -615,4 +615,7 @@ public void handleFutureResult(WorkContainer<K, V> wc) {
}
}

public boolean isSystemIdle() {
return getNumberRecordsOutForProcessing() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ public abstract class AbstractParallelEoSStreamProcessorTestBase {

/**
* The frequency with which we pretend to poll the broker for records - actually the pretend long poll timeout. A
* lower value shouldn't affect test speed much unless many different batches of messages are "published".
* lower value shouldn't affect test speed much unless many different batches of messages are "published" as test
* messages are queued up at the beginning and the polled.
*
* @see LongPollingMockConsumer#poll(Duration)
*/
public static final int DEFAULT_BROKER_POLL_FREQUENCY_MS = 100;
public static final int DEFAULT_BROKER_POLL_FREQUENCY_MS = 500;

/**
* The commit interval for the main {@link AbstractParallelEoSStreamProcessor} control thread. Actually the timeout
Expand Down Expand Up @@ -137,7 +138,10 @@ public void close() {
// don't try to close if error'd (at least one test purposefully creates an error to tests error handling) - we
// don't want to bubble up an error here that we expect from here.
if (!parentParallelConsumer.isClosedOrFailed()) {
log.debug("Test finished, closing pc...");
parentParallelConsumer.close();
} else {
log.debug("Test finished, pc already closed.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ void consumeFlowDoesntRequireProducer(CommitMode commitMode) {
setupData();

parallelConsumer.poll((ignore) -> {
log.debug("rec: {}", ignore);
log.debug("Test record processor - rec: {}", ignore);
});

//
Expand Down
12 changes: 6 additions & 6 deletions parallel-consumer-core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@
<!-- <pattern>%highlight(%-5level) %d{yyyy-MM-dd'T'HH:mm:ss.SSS} %yellow([%thread]) %blue(%logger{36}\(%class{0}.java:%line\)) %msg%n</pattern>-->
<!-- <pattern>%highlight(%-5level) %d{yyyy-MM-dd'T'HH:mm:ss.SSS} %yellow([%thread]) %blue(%logger{36}\(%file:%line\)) %msg%n</pattern>-->
<!-- <pattern>%highlight(%-5level) %yellow([%thread]) %blue(\(%file:%line\)) %cyan(#%M) %msg%n</pattern>-->
<pattern>%d{mm:ss.SSS} %X{pcId} %highlight(%-5level) %yellow([%thread]) %X{offset} %cyan(\(%file:%line\)#%M) %msg%n
</pattern>
<pattern>%d{mm:ss.SSS} %X{pcId} %highlight(%-5level) %yellow([%thread]) %X{offset} %cyan(\(%file:%line\)#%M) %msg%n</pattern>
<!-- <pattern>%highlight(%-5level) %yellow([%thread]) %cyan(\(%logger{36}:%line#%M\)) %msg%n</pattern>-->
</encoder>
</appender>

<root level="warn">
<!-- <root level="debug">-->
<!-- <root level="debug">-->
<appender-ref ref="STDOUT"/>
</root>

<!-- primary -->
<logger name="io.confluent.parallelconsumer" level="info"/>
<!-- <logger name="io.confluent.parallelconsumer" level="debug"/>-->
<!-- <logger name="io.confluent.csid" level="debug"/>-->
<!-- <logger name="io.confluent.parallelconsumer" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer" level="error"/>-->
<!-- <logger name="io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer.internal.BrokerPollSystem" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest" level="info"/> &lt;!&ndash; docker logs &ndash;&gt;-->
<!-- <logger name="io.confluent.csid" level="info"/>-->
<!-- <logger name="io.confluent.csid.utils" level="debug"/>-->
<!-- <logger name="io.confluent.csid" level="debug"/>-->
<!-- <logger name="io.confluent.csid.utils" level="debug"/>-->


<!-- <logger name="io.confluent.parallelconsumer" level="trace" />-->

Expand Down

0 comments on commit 01096c7

Please sign in to comment.