diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 603e1e8255091..124386b9bc3ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -51,7 +51,8 @@ class KStreamKStreamJoin implements ProcessorSupplier implements ProcessorSupplier record) { return; } - boolean needOuterJoin = outer; // Emit all non-joined records which window has closed if (inputRecordTimestamp == sharedTimeTracker.streamTime) { outerJoinStore.ifPresent(store -> emitNonJoinedOuterRecords(store, record)); } + + boolean needOuterJoin = outer; try (final WindowStoreIterator iter = otherWindowStore.fetch(record.key(), timeFrom, timeTo)) { while (iter.hasNext()) { needOuterJoin = false; @@ -200,7 +202,7 @@ private void emitNonJoinedOuterRecords( // to reduce runtime cost, we try to avoid paying those cost // only try to emit left/outer join results if there _might_ be any result records - if (sharedTimeTracker.minTime >= sharedTimeTracker.streamTime - joinSpuriousLookBackTimeMs - joinGraceMs) { + if (sharedTimeTracker.minTime + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { return; } // throttle the emit frequency to a (configurable) interval; @@ -222,6 +224,8 @@ private void emitNonJoinedOuterRecords( TimestampedKeyAndJoinSide prevKey = null; while (it.hasNext()) { + boolean outerJoinLeftBreak = false; + boolean outerJoinRightBreak = false; final KeyValue, LeftOrRightValue> next = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = next.key; final LeftOrRightValue value = next.value; @@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed - if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { - break; + final long outerJoinLookBackTimeMs = getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide); + if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + joinGraceMs >= sharedTimeTracker.streamTime) { + if (timestampedKeyAndJoinSide.isLeftSide()) { + outerJoinLeftBreak = true; // there are no more candidates to emit on left-outerJoin-side + } else { + outerJoinRightBreak = true; // there are no more candidates to emit on right-outerJoin-side + } + if (outerJoinLeftBreak && outerJoinRightBreak) { + break; // there are no more candidates to emit on left-outerJoin-side and + // right-outerJoin-side + } else { + continue; // there are possibly candidates left on the other outerJoin-side + } } final VOut nullJoinedValue; @@ -268,6 +283,15 @@ private void emitNonJoinedOuterRecords( } } + private long getOuterJoinLookBackTimeMs(final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide) { + // depending on the JoinSide we fill in the outerJoinLookBackTimeMs + if (timestampedKeyAndJoinSide.isLeftSide()) { + return windowsAfterMs; // On the left-JoinSide we look back in time + } else { + return windowsBeforeMs; // On the right-JoinSide we look forward in time + } + } + @Override public void close() { sharedTimeTrackerSupplier.remove(context().taskId()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java index 1d9a77b5bf495..10ab37cee0714 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.JoinWindows; @@ -99,6 +100,7 @@ public void before(final TestInfo testInfo) throws IOException { final String safeTestName = safeUniqueTestName(testInfo); streamsConfig = getStreamsConfig(safeTestName); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath); + streamsConfig.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 0L); } @AfterEach diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 156b553455d47..fd36b241b27b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -436,6 +436,184 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } + @Test + public void testLeftJoinedRecordsWithZeroAfterAreEmitted() { + final StreamsBuilder builder = new StreamsBuilder(); + + final int[] expectedKeys = new int[] {0, 1, 2, 3}; + + final KStream stream1; + final KStream stream2; + final KStream joined; + final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + + joined = stream1.leftJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), + StreamJoined.with(Serdes.Integer(), + Serdes.String(), + Serdes.String()) + ); + joined.process(supplier); + + final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { + final TestInputTopic inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockApiProcessor processor = supplier.theCapturedProcessor(); + + processor.init(null); + + // push four items with increasing timestamps to the primary stream; the other window is empty; + // this should emit the first three left-joined items; + // A3 is not triggered yet + // w1 = {} + // w2 = {} + // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = {} + long time = 1000L; + for (int i = 0; i < expectedKeys.length; i++) { + inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i); + } + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 1000L), + new KeyValueTimestamp<>(1, "A1+null", 1001L), + new KeyValueTimestamp<>(2, "A2+null", 1002L) + ); + + // push four items smaller timestamps (out of window) to the secondary stream; + // this should produce four joined items + // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = {} + // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) } + time = 1000L - 1L; + for (final int expectedKey : expectedKeys) { + inputTopic2.pipeInput(expectedKey, "a" + expectedKey, time); + } + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+a0", 1000L), + new KeyValueTimestamp<>(1, "A1+a1", 1001L), + new KeyValueTimestamp<>(2, "A2+a2", 1002L), + new KeyValueTimestamp<>(3, "A3+a3", 1003L) + ); + + // push four items with increased timestamps to the secondary stream; + // this should produce four joined item + // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999) } + // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) } + time += 1L; + for (final int expectedKey : expectedKeys) { + inputTopic2.pipeInput(expectedKey, "b" + expectedKey, time); + } + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+b0", 1000L), + new KeyValueTimestamp<>(1, "A1+b1", 1001L), + new KeyValueTimestamp<>(2, "A2+b2", 1002L), + new KeyValueTimestamp<>(3, "A3+b3", 1003L) + ); + + // push four items with increased timestamps to the secondary stream; + // this should produce only three joined items; + // c0 arrives too late to be joined with A0 + // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000) } + // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), + // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) } + time += 1L; + for (final int expectedKey : expectedKeys) { + inputTopic2.pipeInput(expectedKey, "c" + expectedKey, time); + } + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+c1", 1001L), + new KeyValueTimestamp<>(2, "A2+c2", 1002L), + new KeyValueTimestamp<>(3, "A3+c3", 1003L) + ); + + // push four items with increased timestamps to the secondary stream; + // this should produce only two joined items; + // d0 and d1 arrive too late to be joined with A0 and A1 + // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), + // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001) } + // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), + // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), + // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) } + time += 1L; + for (final int expectedKey : expectedKeys) { + inputTopic2.pipeInput(expectedKey, "d" + expectedKey, time); + } + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "A2+d2", 1002L), + new KeyValueTimestamp<>(3, "A3+d3", 1003L) + ); + + // push four items with increased timestamps to the secondary stream; + // this should produce one joined item; + // only e3 can be joined with A3; + // e0, e1 and e2 arrive too late to be joined with A0, A1 and A2 + // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), + // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), + // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002) } + // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), + // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), + // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002), + // 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) } + time += 1L; + for (final int expectedKey : expectedKeys) { + inputTopic2.pipeInput(expectedKey, "e" + expectedKey, time); + } + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(3, "A3+e3", 1003L) + ); + + // push four items with larger timestamps to the secondary stream; + // no (non-)joined items can be produced + // + // w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), + // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), + // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002), + // 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003) } + // --> w1 = { 0:A0 (ts: 1000), 1:A1 (ts: 1001), 2:A2 (ts: 1002), 3:A3 (ts: 1003) } + // w2 = { 0:a0 (ts: 999), 1:a1 (ts: 999), 2:a2 (ts: 999), 3:a3 (ts: 999), + // 0:b0 (ts: 1000), 1:b1 (ts: 1000), 2:b2 (ts: 1000), 3:b3 (ts: 1000), + // 0:c0 (ts: 1001), 1:c1 (ts: 1001), 2:c2 (ts: 1001), 3:c3 (ts: 1001), + // 0:d0 (ts: 1002), 1:d1 (ts: 1002), 2:d2 (ts: 1002), 3:d3 (ts: 1002), + // 0:e0 (ts: 1003), 1:e1 (ts: 1003), 2:e2 (ts: 1003), 3:e3 (ts: 1003), + // 0:f0 (ts: 1100), 1:f1 (ts: 1100), 2:f2 (ts: 1100), 3:f3 (ts: 1100) } + time = 1000 + 100L; + for (final int expectedKey : expectedKeys) { + inputTopic2.pipeInput(expectedKey, "f" + expectedKey, time); + } + processor.checkAndClearProcessResult(); + } + } + @Test public void testLeftJoinWithInMemoryCustomSuppliers() { final JoinWindows joinWindows = JoinWindows.ofTimeDifferenceAndGrace(ofMillis(100L), ofMillis(0L)); @@ -609,8 +787,9 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); - // push one item to the other window that has a join; this should produce non-joined records with a closed window first, then - // the joined records + // push one item to the other window that has a join; + // this should produce the joined record first; + // then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java index 099dc5b0c83fa..28a5f1488fbce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java @@ -108,11 +108,11 @@ public void testOuterJoinDuplicatesWithFixDisabledOldApi() { inputTopic2.pipeInput(1, "b1", 0L); processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(0, "A0+null", 0L), - new KeyValueTimestamp<>(0, "A0-0+null", 0L), - new KeyValueTimestamp<>(0, "A0+a0", 0L), - new KeyValueTimestamp<>(0, "A0-0+a0", 0L), - new KeyValueTimestamp<>(1, "null+b1", 0L) + new KeyValueTimestamp<>(0, "A0+null", 0L), + new KeyValueTimestamp<>(0, "A0-0+null", 0L), + new KeyValueTimestamp<>(0, "A0+a0", 0L), + new KeyValueTimestamp<>(0, "A0-0+a0", 0L), + new KeyValueTimestamp<>(1, "null+b1", 0L) ); } } @@ -438,13 +438,13 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); - // push one item to the other window that has a join; this should produce non-joined records with a closed window first, then - // the joined records - // by the time they were produced before + // push one item to the other window that has a join; + // this should produce the not-joined record first; + // then the joined record // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } - // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 0) } - // --> w2 = { 0:a0 (ts: 110) } + // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } + // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( new KeyValueTimestamp<>(0, "A0+null", 0L), @@ -788,7 +788,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() { new KeyValueTimestamp<>(1, "A1+null", 1L) ); - // push one item to the other stream; this should not produce any items + // push one item to the other stream; this should produce one right-join item // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } // w2 = { 0:a0 (ts: 100), 1:a1 (ts: 102) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } @@ -841,7 +841,8 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { final MockApiProcessor processor = supplier.theCapturedProcessor(); long time = 0L; - // push two items to the primary stream; the other window is empty; this should not produce any item + // push two items to the primary stream; the other window is empty; + // this should produce one left-joined item // w1 = {} // w2 = {} // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } @@ -849,7 +850,9 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { for (int i = 0; i < 2; i++) { inputTopic1.pipeInput(expectedKeys[i], "A" + expectedKeys[i], time + i); } - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 0L) + ); // push one item to the other stream; this should produce one full-join item // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } @@ -863,7 +866,8 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { new KeyValueTimestamp<>(1, "A1+a1", 1L) ); - // push one item to the other stream; this should produce one left-join item + // push one item to the other stream; + // this should not produce any item // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } // w2 = { 1:a1 (ts: 1) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } @@ -871,9 +875,7 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { time += 100; inputTopic2.pipeInput(expectedKeys[2], "a" + expectedKeys[2], time); - processor.checkAndClearProcessResult( - new KeyValueTimestamp<>(0, "A0+null", 0L) - ); + processor.checkAndClearProcessResult(); // push one item to the other stream; this should not produce any item // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } @@ -884,11 +886,12 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); - // push one item to the first stream; this should produce one full-join item + // push one item to the first stream; + // this should produce one inner-join item; // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1) } // w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 1), 2:A2 (ts: 201) } - // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101 } + // --> w2 = { 1:a1 (ts: 1), 2:a2 (ts: 101), 3:a3 (ts: 101) } time += 100; inputTopic1.pipeInput(expectedKeys[2], "A" + expectedKeys[2], time);