Skip to content

Commit 2559fc1

Browse files
authored
NIFI-14467 Stop ConsumeBoxEnterpriseEvents when not scheduled (#9873)
Signed-off-by: David Handermann <[email protected]>
1 parent ad3dc61 commit 2559fc1

File tree

2 files changed

+89
-27
lines changed

2 files changed

+89
-27
lines changed

nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,18 @@ private String retrieveLatestStreamPosition() {
179179

180180
@Override
181181
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
182-
while (true) {
182+
while (isScheduled()) {
183+
getLogger().debug("Consuming Box Events from position: {}", streamPosition);
184+
183185
final EventLog eventLog = getEventLog(streamPosition);
184186
streamPosition = eventLog.getNextStreamPosition();
187+
188+
getLogger().debug("Consumed {} Box Enterprise Events. New position: {}", eventLog.getSize(), streamPosition);
189+
185190
writeStreamPosition(streamPosition, session);
186191

187192
if (eventLog.getSize() == 0) {
188-
return;
193+
break;
189194
}
190195

191196
writeLogAsRecords(eventLog, session);

nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java

+82-25
Original file line numberDiff line numberDiff line change
@@ -25,35 +25,38 @@
2525
import org.apache.nifi.util.TestRunners;
2626
import org.jetbrains.annotations.Nullable;
2727
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
2829
import org.junit.jupiter.params.ParameterizedTest;
2930
import org.junit.jupiter.params.provider.Arguments;
3031
import org.junit.jupiter.params.provider.MethodSource;
3132

3233
import java.util.ArrayList;
3334
import java.util.List;
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.Future;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.function.Function;
3442
import java.util.stream.Stream;
3543

3644
import static java.util.Collections.emptyList;
45+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3746
import static org.junit.jupiter.api.Assertions.assertEquals;
47+
import static org.junit.jupiter.api.Assertions.assertTrue;
3848
import static org.junit.jupiter.params.provider.Arguments.arguments;
3949
import static org.mockito.Mockito.lenient;
4050
import static org.mockito.Mockito.mock;
4151
import static org.mockito.Mockito.when;
4252

4353
class ConsumeBoxEnterpriseEventsTest extends AbstractBoxFileTest {
4454

45-
private TestEventStream eventStream;
55+
private TestConsumeBoxEnterpriseEvents processor;
4656

4757
@BeforeEach
4858
void setUp() throws Exception {
49-
eventStream = new TestEventStream();
50-
51-
final ConsumeBoxEnterpriseEvents processor = new ConsumeBoxEnterpriseEvents() {
52-
@Override
53-
EventLog getEventLog(String position) {
54-
return eventStream.consume(position);
55-
}
56-
};
59+
processor = new TestConsumeBoxEnterpriseEvents();
5760

5861
testRunner = TestRunners.newTestRunner(processor);
5962
super.setUp();
@@ -71,6 +74,9 @@ void testConsumeEvents(
7174
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_OFFSET, startOffset);
7275
}
7376

77+
final TestEventStream eventStream = new TestEventStream();
78+
processor.overrideGetEventLog(eventStream::consume);
79+
7480
eventStream.addEvent(0);
7581
eventStream.addEvent(1);
7682
eventStream.addEvent(2);
@@ -97,6 +103,39 @@ static List<Arguments> dataFor_testConsumeEvents() {
97103
);
98104
}
99105

106+
@Test
107+
void testGracefulTermination() throws InterruptedException {
108+
final CountDownLatch scheduledLatch = new CountDownLatch(1);
109+
final AtomicInteger consumedEvents = new AtomicInteger(0);
110+
111+
// Infinite stream.
112+
processor.overrideGetEventLog(__ -> {
113+
scheduledLatch.countDown();
114+
consumedEvents.incrementAndGet();
115+
return createEventLog(List.of(createBoxEvent(1)), "");
116+
});
117+
118+
final ExecutorService runExecutor = Executors.newSingleThreadExecutor();
119+
120+
try {
121+
// Starting the processor that consumes an infinite stream.
122+
final Future<?> runFuture = runExecutor.submit(() -> testRunner.run(/*iterations=*/ 1, /*stopOnFinish=*/ false));
123+
124+
assertTrue(scheduledLatch.await(5, TimeUnit.SECONDS), "Processor did not start");
125+
126+
// Triggering the processor to stop.
127+
testRunner.unSchedule();
128+
129+
assertDoesNotThrow(() -> runFuture.get(5, TimeUnit.SECONDS), "Processor did not stop gracefully");
130+
131+
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS, consumedEvents.get());
132+
} finally {
133+
// We can't use try with resources, as Executors use a shutdown method
134+
// which indefinitely waits for submitted tasks.
135+
runExecutor.shutdownNow();
136+
}
137+
}
138+
100139
private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
101140
final JsonValue json = Json.parse(flowFile.getContent());
102141
return json.asArray().values().stream()
@@ -105,46 +144,64 @@ private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
105144
.map(Integer::parseInt);
106145
}
107146

147+
/**
148+
* This class is used to override external call in {@link ConsumeBoxEnterpriseEvents#getEventLog(String)}.
149+
*/
150+
private static class TestConsumeBoxEnterpriseEvents extends ConsumeBoxEnterpriseEvents {
151+
152+
private volatile Function<String, EventLog> fakeEventLog;
153+
154+
void overrideGetEventLog(final Function<String, EventLog> fakeEventLog) {
155+
this.fakeEventLog = fakeEventLog;
156+
}
157+
158+
@Override
159+
EventLog getEventLog(String position) {
160+
return fakeEventLog.apply(position);
161+
}
162+
}
163+
108164
private static class TestEventStream {
109165

110166
private static final String NOW_POSITION = "now";
111167

112168
private final List<BoxEvent> events = new ArrayList<>();
113169

114170
void addEvent(final int eventId) {
115-
final BoxEvent boxEvent = new BoxEvent(null, "{\"event_id\": \"%d\"}".formatted(eventId));
116-
events.add(boxEvent);
171+
events.add(createBoxEvent(eventId));
117172
}
118173

119174
EventLog consume(final String position) {
175+
final String nextPosition = String.valueOf(events.size());
176+
120177
if (NOW_POSITION.equals(position)) {
121-
return createEmptyEventLog();
178+
return createEventLog(emptyList(), nextPosition);
122179
}
123180

124181
final int streamPosition = Integer.parseInt(position);
125182
if (streamPosition > events.size()) {
126183
// Real Box API returns the latest offset position, even if streamPosition was greater.
127-
return createEmptyEventLog();
184+
return createEventLog(emptyList(), nextPosition);
128185
}
129186

130187
final List<BoxEvent> consumedEvents = events.subList(streamPosition, events.size());
131188

132-
return createEventLog(consumedEvents);
189+
return createEventLog(consumedEvents, nextPosition);
133190
}
191+
}
134192

135-
private EventLog createEmptyEventLog() {
136-
return createEventLog(emptyList());
137-
}
193+
private static BoxEvent createBoxEvent(final int eventId) {
194+
return new BoxEvent(null, "{\"event_id\": \"%d\"}".formatted(eventId));
195+
}
138196

139-
private EventLog createEventLog(final List<BoxEvent> consumedEvents) {
140-
// EventLog is not designed for being extended. Thus, mocking it.
141-
final EventLog eventLog = mock();
197+
private static EventLog createEventLog(final List<BoxEvent> consumedEvents, final String nextPosition) {
198+
// EventLog is not designed for being extended. Thus, mocking it.
199+
final EventLog eventLog = mock();
142200

143-
when(eventLog.getNextStreamPosition()).thenReturn(String.valueOf(events.size()));
144-
lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
145-
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
201+
when(eventLog.getNextStreamPosition()).thenReturn(nextPosition);
202+
lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
203+
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
146204

147-
return eventLog;
148-
}
205+
return eventLog;
149206
}
150207
}

0 commit comments

Comments
 (0)